Preserve legacy API options in parallel checks

This commit is contained in:
linhongkuan 2026-06-25 09:49:54 +08:00
parent bb943328ef
commit 0df41b2d56
5 changed files with 85 additions and 10 deletions

View file

@ -162,7 +162,9 @@ class StyleGuide:
# Stop cringing... I know it's gross. # Stop cringing... I know it's gross.
self._application.make_guide() self._application.make_guide()
self._application.file_checker_manager = None self._application.file_checker_manager = None
self._application.make_file_checker_manager([]) self._application.make_file_checker_manager(
[], use_initialized_options=True,
)
def input_file( def input_file(
self, self,
@ -212,5 +214,5 @@ def get_style_guide(**kwargs: Any) -> StyleGuide:
LOG.error('Could not update option "%s"', key) LOG.error('Could not update option "%s"', key)
application.make_formatter() application.make_formatter()
application.make_guide() application.make_guide()
application.make_file_checker_manager([]) application.make_file_checker_manager([], use_initialized_options=True)
return StyleGuide(application) return StyleGuide(application)

View file

@ -7,6 +7,7 @@ import errno
import logging import logging
import multiprocessing.pool import multiprocessing.pool
import operator import operator
import pickle
import signal import signal
import tokenize import tokenize
from collections.abc import Generator from collections.abc import Generator
@ -61,16 +62,24 @@ def _mp_prefork(
_mp = None _mp = None
def _mp_init(argv: Sequence[str]) -> None: def _mp_init(
argv: Sequence[str],
plugins: Checkers | None = None,
options: argparse.Namespace | None = None,
) -> None:
global _mp global _mp
# Ensure correct signaling of ^C using multiprocessing.Pool. # Ensure correct signaling of ^C using multiprocessing.Pool.
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
if plugins is not None and options is not None:
_mp = plugins, options
return
# for `fork` this'll already be set # for `fork` this'll already be set
if _mp is None: if _mp is None:
plugins, options = parse_args(argv) parsed_plugins, parsed_options = parse_args(argv)
_mp = plugins.checkers, options _mp = parsed_plugins.checkers, parsed_options
def _mp_run(filename: str) -> tuple[str, Results, dict[str, int]]: def _mp_run(filename: str) -> tuple[str, Results, dict[str, int]]:
@ -105,6 +114,7 @@ class Manager:
style_guide: StyleGuideManager, style_guide: StyleGuideManager,
plugins: Checkers, plugins: Checkers,
argv: Sequence[str], argv: Sequence[str],
use_initialized_options: bool = False,
) -> None: ) -> None:
"""Initialize our Manager instance.""" """Initialize our Manager instance."""
self.style_guide = style_guide self.style_guide = style_guide
@ -119,6 +129,7 @@ class Manager:
} }
self.exclude = (*self.options.exclude, *self.options.extend_exclude) self.exclude = (*self.options.exclude, *self.options.extend_exclude)
self.argv = argv self.argv = argv
self.use_initialized_options = use_initialized_options
self.results: list[tuple[str, Results, dict[str, int]]] = [] self.results: list[tuple[str, Results, dict[str, int]]] = []
def _process_statistics(self) -> None: def _process_statistics(self) -> None:
@ -191,8 +202,12 @@ class Manager:
def run_parallel(self) -> None: def run_parallel(self) -> None:
"""Run the checkers in parallel.""" """Run the checkers in parallel."""
plugins = self.plugins if self.use_initialized_options else None
options = self.options if self.use_initialized_options else None
with _mp_prefork(self.plugins, self.options): with _mp_prefork(self.plugins, self.options):
pool = _try_initialize_processpool(self.jobs, self.argv) pool = _try_initialize_processpool(
self.jobs, self.argv, plugins=plugins, options=options,
)
if pool is None: if pool is None:
self.run_serial() self.run_serial()
@ -547,13 +562,21 @@ class FileChecker:
def _try_initialize_processpool( def _try_initialize_processpool(
job_count: int, job_count: int,
argv: Sequence[str], argv: Sequence[str],
*,
plugins: Checkers | None = None,
options: argparse.Namespace | None = None,
) -> multiprocessing.pool.Pool | None: ) -> multiprocessing.pool.Pool | None:
"""Return a new process pool instance if we are able to create one.""" """Return a new process pool instance if we are able to create one."""
try: try:
return multiprocessing.Pool(job_count, _mp_init, initargs=(argv,)) return multiprocessing.Pool(
job_count, _mp_init, initargs=(argv, plugins, options),
)
except OSError as err: except OSError as err:
if err.errno not in SERIAL_RETRY_ERRNOS: if err.errno not in SERIAL_RETRY_ERRNOS:
raise raise
except (AttributeError, pickle.PicklingError, TypeError):
if plugins is None and options is None:
raise
except ImportError: except ImportError:
pass pass

View file

@ -79,7 +79,9 @@ class Application:
self.options, self.formatter, self.options, self.formatter,
) )
def make_file_checker_manager(self, argv: Sequence[str]) -> None: def make_file_checker_manager(
self, argv: Sequence[str], use_initialized_options: bool = False,
) -> None:
"""Initialize our FileChecker Manager.""" """Initialize our FileChecker Manager."""
assert self.guide is not None assert self.guide is not None
assert self.plugins is not None assert self.plugins is not None
@ -87,6 +89,7 @@ class Application:
style_guide=self.guide, style_guide=self.guide,
plugins=self.plugins.checkers, plugins=self.plugins.checkers,
argv=argv, argv=argv,
use_initialized_options=use_initialized_options,
) )
def run_checks(self) -> None: def run_checks(self) -> None:

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from flake8.api import legacy from flake8.api import legacy
from flake8.main.options import JobsArgument
def test_legacy_api(tmpdir): def test_legacy_api(tmpdir):
@ -13,3 +14,17 @@ def test_legacy_api(tmpdir):
style_guide = legacy.get_style_guide() style_guide = legacy.get_style_guide()
report = style_guide.check_files([t_py.strpath]) report = style_guide.check_files([t_py.strpath])
assert report.total_errors == 1 assert report.total_errors == 1
def test_legacy_api_uses_initialized_options_for_parallel_checks(tmpdir):
with tmpdir.as_cwd():
a_py = tmpdir.join("a.py")
b_py = tmpdir.join("b.py")
a_py.write('x = "' + "a" * 80 + '"\n')
b_py.write('y = "' + "b" * 80 + '"\n')
style_guide = legacy.get_style_guide(color="never", max_line_length=88)
style_guide.options.jobs = JobsArgument("2")
report = style_guide.check_files([a_py.strpath, b_py.strpath])
assert report.total_errors == 0

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import importlib.metadata import importlib.metadata
import pickle
from unittest import mock from unittest import mock
import pytest import pytest
@ -291,7 +292,24 @@ def test_acquire_when_multiprocessing_pool_can_initialize():
with mock.patch("multiprocessing.Pool") as pool: with mock.patch("multiprocessing.Pool") as pool:
result = checker._try_initialize_processpool(2, []) result = checker._try_initialize_processpool(2, [])
pool.assert_called_once_with(2, checker._mp_init, initargs=([],)) pool.assert_called_once_with(
2, checker._mp_init, initargs=([], None, None),
)
assert result is pool.return_value
def test_acquire_when_multiprocessing_pool_uses_initialized_options():
plugins = mock.Mock()
options = mock.Mock()
with mock.patch("multiprocessing.Pool") as pool:
result = checker._try_initialize_processpool(
2, [], plugins=plugins, options=options,
)
pool.assert_called_once_with(
2, checker._mp_init, initargs=([], plugins, options),
)
assert result is pool.return_value assert result is pool.return_value
@ -310,7 +328,21 @@ def test_acquire_when_multiprocessing_pool_can_not_initialize():
with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool: with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool:
result = checker._try_initialize_processpool(2, []) result = checker._try_initialize_processpool(2, [])
pool.assert_called_once_with(2, checker._mp_init, initargs=([],)) pool.assert_called_once_with(
2, checker._mp_init, initargs=([], None, None),
)
assert result is None
def test_acquire_falls_back_when_initialized_options_are_unpickleable():
with mock.patch(
"multiprocessing.Pool", side_effect=pickle.PicklingError,
) as pool:
result = checker._try_initialize_processpool(
2, [], plugins=mock.Mock(), options=mock.Mock(),
)
pool.assert_called_once()
assert result is None assert result is None