Merge branch 'bugfix/cpython-3770-semopen-missing' into 'master'

Support linting when missing sem_open syscall

See merge request pycqa/flake8!448
This commit is contained in:
Anthony Sottile 2020-08-27 22:44:56 +00:00
commit 3765318dcf
2 changed files with 58 additions and 11 deletions

View file

@ -9,7 +9,7 @@ import tokenize
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
try: try:
import multiprocessing import multiprocessing.pool
except ImportError: except ImportError:
multiprocessing = None # type: ignore multiprocessing = None # type: ignore
@ -262,18 +262,16 @@ class Manager(object):
results_found += len(results) results_found += len(results)
return (results_found, results_reported) return (results_found, results_reported)
def run_parallel(self): def run_parallel(self): # type: () -> None
"""Run the checkers in parallel.""" """Run the checkers in parallel."""
# fmt: off # fmt: off
final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501 final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501
final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, None]] # noqa: E501 final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501
# fmt: on # fmt: on
try: pool = _try_initialize_processpool(self.jobs)
pool = multiprocessing.Pool(self.jobs, _pool_init)
except OSError as oserr: if pool is None:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
self.run_serial() self.run_serial()
return return
@ -303,12 +301,12 @@ class Manager(object):
checker.results = final_results[filename] checker.results = final_results[filename]
checker.statistics = final_statistics[filename] checker.statistics = final_statistics[filename]
def run_serial(self): def run_serial(self): # type: () -> None
"""Run the checkers in serial.""" """Run the checkers in serial."""
for checker in self.checkers: for checker in self.checkers:
checker.run_checks() checker.run_checks()
def run(self): def run(self): # type: () -> None
"""Run all the checkers. """Run all the checkers.
This will intelligently decide whether to run the checks in parallel This will intelligently decide whether to run the checks in parallel
@ -634,11 +632,25 @@ class FileChecker(object):
self.run_physical_checks(line + "\n") self.run_physical_checks(line + "\n")
def _pool_init(): def _pool_init(): # type: () -> None
"""Ensure correct signaling of ^C using multiprocessing.Pool.""" """Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
def _try_initialize_processpool(job_count):
# type: (int) -> Optional[multiprocessing.pool.Pool]
"""Return a new process pool instance if we are able to create one."""
try:
return multiprocessing.Pool(job_count, _pool_init)
except OSError as err:
if err.errno not in SERIAL_RETRY_ERRNOS:
raise
except ImportError:
pass
return None
def calculate_pool_chunksize(num_checkers, num_jobs): def calculate_pool_chunksize(num_checkers, num_jobs):
"""Determine the chunksize for the multiprocessing Pool. """Determine the chunksize for the multiprocessing Pool.

View file

@ -246,3 +246,38 @@ def test_report_order(results, expected_order):
with mock.patch.object(manager, '_handle_results', handler): with mock.patch.object(manager, '_handle_results', handler):
assert manager.report() == (len(results), len(results)) assert manager.report() == (len(results), len(results))
handler.assert_called_once_with('placeholder', expected_results) handler.assert_called_once_with('placeholder', expected_results)
def test_acquire_when_multiprocessing_pool_can_initialize():
"""Verify successful importing of hardware semaphore support.
Mock the behaviour of a platform that has a hardware sem_open
implementation, and then attempt to initialize a multiprocessing
Pool object.
This simulates the behaviour on most common platforms.
"""
with mock.patch("multiprocessing.Pool") as pool:
result = checker._try_initialize_processpool(2)
pool.assert_called_once_with(2, checker._pool_init)
assert result is pool.return_value
def test_acquire_when_multiprocessing_pool_can_not_initialize():
"""Verify unsuccessful importing of hardware semaphore support.
Mock the behaviour of a platform that has not got a hardware sem_open
implementation, and then attempt to initialize a multiprocessing
Pool object.
This scenario will occur on platforms such as Termux and on some
more exotic devices.
https://github.com/python/cpython/blob/4e02981de0952f54bf87967f8e10d169d6946b40/Lib/multiprocessing/synchronize.py#L30-L33
"""
with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool:
result = checker._try_initialize_processpool(2)
pool.assert_called_once_with(2, checker._pool_init)
assert result is None