Support linting when missing sem_open syscall

Platforms such as Termux on Android, and other exotic devices
do not provide a sem_open implementation on the OS level. This
is problematic, as the error resulting from this occurs when
calling multiprocessing.Pool, throwing an unhandled ImportError.

The issue itself is outlined in https://bugs.python.org/issue3770.

This change allows devices missing this system call to respond
to the missing feature by falling back to synchronous execution,
which appears to be the default behaviour if the multiprocessing
module is not found.

This change also adds a potential fix for developers working
on platforms where multiprocessing itself cannot be imported.
The existing code would set the name referencing the import to
None, but there are no clear checks to ensure this does not
result in an AttributeError later when multiprocessing.Pool
has accession attempts.

Existing users should see no difference in functionality, as they
will assumably already be able to use flake8, so will not be
missing this sem_open call.

Users on devices without the sem_open call will now be able
to use flake8 where they would be unable to before due to
unhandled ImportErrors.
This commit is contained in:
Nekokatt 2020-08-27 12:38:38 +01:00 committed by Anthony Sottile
parent abdc9b14d6
commit e6a5f6a663
2 changed files with 58 additions and 11 deletions

View file

@ -9,7 +9,7 @@ import tokenize
from typing import Dict, List, Optional, Tuple
try:
import multiprocessing
import multiprocessing.pool
except ImportError:
multiprocessing = None # type: ignore
@ -262,18 +262,16 @@ class Manager(object):
results_found += len(results)
return (results_found, results_reported)
def run_parallel(self):
def run_parallel(self): # type: () -> None
"""Run the checkers in parallel."""
# fmt: off
final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501
final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, None]] # noqa: E501
final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501
# fmt: on
try:
pool = multiprocessing.Pool(self.jobs, _pool_init)
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
pool = _try_initialize_processpool(self.jobs)
if pool is None:
self.run_serial()
return
@ -303,12 +301,12 @@ class Manager(object):
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]
def run_serial(self):
def run_serial(self): # type: () -> None
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
def run(self):
def run(self): # type: () -> None
"""Run all the checkers.
This will intelligently decide whether to run the checks in parallel
@ -634,11 +632,25 @@ class FileChecker(object):
self.run_physical_checks(line + "\n")
def _pool_init():
def _pool_init(): # type: () -> None
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _try_initialize_processpool(job_count):
# type: (int) -> Optional[multiprocessing.pool.Pool]
"""Return a new process pool instance if we are able to create one."""
try:
return multiprocessing.Pool(job_count, _pool_init)
except OSError as err:
if err.errno not in SERIAL_RETRY_ERRNOS:
raise
except ImportError:
pass
return None
def calculate_pool_chunksize(num_checkers, num_jobs):
"""Determine the chunksize for the multiprocessing Pool.

View file

@ -246,3 +246,38 @@ def test_report_order(results, expected_order):
with mock.patch.object(manager, '_handle_results', handler):
assert manager.report() == (len(results), len(results))
handler.assert_called_once_with('placeholder', expected_results)
def test_acquire_when_multiprocessing_pool_can_initialize():
"""Verify successful importing of hardware semaphore support.
Mock the behaviour of a platform that has a hardware sem_open
implementation, and then attempt to initialize a multiprocessing
Pool object.
This simulates the behaviour on most common platforms.
"""
with mock.patch("multiprocessing.Pool") as pool:
result = checker._try_initialize_processpool(2)
pool.assert_called_once_with(2, checker._pool_init)
assert result is pool.return_value
def test_acquire_when_multiprocessing_pool_can_not_initialize():
"""Verify unsuccessful importing of hardware semaphore support.
Mock the behaviour of a platform that has not got a hardware sem_open
implementation, and then attempt to initialize a multiprocessing
Pool object.
This scenario will occur on platforms such as Termux and on some
more exotic devices.
https://github.com/python/cpython/blob/4e02981de0952f54bf87967f8e10d169d6946b40/Lib/multiprocessing/synchronize.py#L30-L33
"""
with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool:
result = checker._try_initialize_processpool(2)
pool.assert_called_once_with(2, checker._pool_init)
assert result is None