From e6a5f6a66391d46d8bfbc2a39cbf3e78ffff0d82 Mon Sep 17 00:00:00 2001 From: Nekokatt Date: Thu, 27 Aug 2020 12:38:38 +0100 Subject: [PATCH] Support linting when missing sem_open syscall Platforms such as Termux on Android, and other exotic devices do not provide a sem_open implementation on the OS level. This is problematic, as the error resulting from this occurs when calling multiprocessing.Pool, throwing an unhandled ImportError. The issue itself is outlined in https://bugs.python.org/issue3770. This change allows devices missing this system call to respond to the missing feature by falling back to synchronous execution, which appears to be the default behaviour if the multiprocessing module is not found. This change also adds a potential fix for developers working on platforms where multiprocessing itself cannot be imported. The existing code would set the name referencing the import to None, but there are no clear checks to ensure this does not result in an AttributeError later when multiprocessing.Pool has accession attempts. Existing users should see no difference in functionality, as they will assumably already be able to use flake8, so will not be missing this sem_open call. Users on devices without the sem_open call will now be able to use flake8 where they would be unable to before due to unhandled ImportErrors. --- src/flake8/checker.py | 34 ++++++++++++++++++++---------- tests/integration/test_checker.py | 35 +++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/src/flake8/checker.py b/src/flake8/checker.py index d993cb9..ea8b5d3 100644 --- a/src/flake8/checker.py +++ b/src/flake8/checker.py @@ -9,7 +9,7 @@ import tokenize from typing import Dict, List, Optional, Tuple try: - import multiprocessing + import multiprocessing.pool except ImportError: multiprocessing = None # type: ignore @@ -262,18 +262,16 @@ class Manager(object): results_found += len(results) return (results_found, results_reported) - def run_parallel(self): + def run_parallel(self): # type: () -> None """Run the checkers in parallel.""" # fmt: off final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501 - final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, None]] # noqa: E501 + final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501 # fmt: on - try: - pool = multiprocessing.Pool(self.jobs, _pool_init) - except OSError as oserr: - if oserr.errno not in SERIAL_RETRY_ERRNOS: - raise + pool = _try_initialize_processpool(self.jobs) + + if pool is None: self.run_serial() return @@ -303,12 +301,12 @@ class Manager(object): checker.results = final_results[filename] checker.statistics = final_statistics[filename] - def run_serial(self): + def run_serial(self): # type: () -> None """Run the checkers in serial.""" for checker in self.checkers: checker.run_checks() - def run(self): + def run(self): # type: () -> None """Run all the checkers. This will intelligently decide whether to run the checks in parallel @@ -634,11 +632,25 @@ class FileChecker(object): self.run_physical_checks(line + "\n") -def _pool_init(): +def _pool_init(): # type: () -> None """Ensure correct signaling of ^C using multiprocessing.Pool.""" signal.signal(signal.SIGINT, signal.SIG_IGN) +def _try_initialize_processpool(job_count): + # type: (int) -> Optional[multiprocessing.pool.Pool] + """Return a new process pool instance if we are able to create one.""" + try: + return multiprocessing.Pool(job_count, _pool_init) + except OSError as err: + if err.errno not in SERIAL_RETRY_ERRNOS: + raise + except ImportError: + pass + + return None + + def calculate_pool_chunksize(num_checkers, num_jobs): """Determine the chunksize for the multiprocessing Pool. diff --git a/tests/integration/test_checker.py b/tests/integration/test_checker.py index 0acdb6e..836b543 100644 --- a/tests/integration/test_checker.py +++ b/tests/integration/test_checker.py @@ -246,3 +246,38 @@ def test_report_order(results, expected_order): with mock.patch.object(manager, '_handle_results', handler): assert manager.report() == (len(results), len(results)) handler.assert_called_once_with('placeholder', expected_results) + + +def test_acquire_when_multiprocessing_pool_can_initialize(): + """Verify successful importing of hardware semaphore support. + + Mock the behaviour of a platform that has a hardware sem_open + implementation, and then attempt to initialize a multiprocessing + Pool object. + + This simulates the behaviour on most common platforms. + """ + with mock.patch("multiprocessing.Pool") as pool: + result = checker._try_initialize_processpool(2) + + pool.assert_called_once_with(2, checker._pool_init) + assert result is pool.return_value + + +def test_acquire_when_multiprocessing_pool_can_not_initialize(): + """Verify unsuccessful importing of hardware semaphore support. + + Mock the behaviour of a platform that has not got a hardware sem_open + implementation, and then attempt to initialize a multiprocessing + Pool object. + + This scenario will occur on platforms such as Termux and on some + more exotic devices. + + https://github.com/python/cpython/blob/4e02981de0952f54bf87967f8e10d169d6946b40/Lib/multiprocessing/synchronize.py#L30-L33 + """ + with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool: + result = checker._try_initialize_processpool(2) + + pool.assert_called_once_with(2, checker._pool_init) + assert result is None