Merge pull request #1723 from PyCQA/mp-other-plats

enable multiprocessing on other platforms
This commit is contained in:
Anthony Sottile 2022-10-27 10:22:20 -04:00 committed by GitHub
commit b89d81a919
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 175 additions and 317 deletions

View file

@ -10,11 +10,10 @@ import logging
import os.path
from typing import Any
import flake8
from flake8.discover_files import expand_paths
from flake8.formatting import base as formatter
from flake8.main import application as app
from flake8.options import config
from flake8.options.parse_args import parse_args
LOG = logging.getLogger(__name__)
@ -163,7 +162,7 @@ class StyleGuide:
# Stop cringing... I know it's gross.
self._application.make_guide()
self._application.file_checker_manager = None
self._application.make_file_checker_manager()
self._application.make_file_checker_manager([])
def input_file(
self,
@ -200,23 +199,7 @@ def get_style_guide(**kwargs: Any) -> StyleGuide:
An initialized StyleGuide
"""
application = app.Application()
prelim_opts, remaining_args = application.parse_preliminary_options([])
flake8.configure_logging(prelim_opts.verbose, prelim_opts.output_file)
cfg, cfg_dir = config.load_config(
config=prelim_opts.config,
extra=prelim_opts.append_config,
isolated=prelim_opts.isolated,
)
application.find_plugins(
cfg,
cfg_dir,
enable_extensions=prelim_opts.enable_extensions,
require_plugins=prelim_opts.require_plugins,
)
application.register_plugin_options()
application.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
application.plugins, application.options = parse_args([])
# We basically want application.initialize to be called but with these
# options set instead before we make our formatter, notifier, internal
# style guide and file checker manager.
@ -229,5 +212,5 @@ def get_style_guide(**kwargs: Any) -> StyleGuide:
LOG.error('Could not update option "%s"', key)
application.make_formatter()
application.make_guide()
application.make_file_checker_manager()
application.make_file_checker_manager([])
return StyleGuide(application)

View file

@ -2,15 +2,17 @@
from __future__ import annotations
import argparse
import collections
import contextlib
import errno
import logging
import multiprocessing.pool
import signal
import tokenize
from typing import Any
from typing import Generator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from flake8 import defaults
@ -18,6 +20,7 @@ from flake8 import exceptions
from flake8 import processor
from flake8 import utils
from flake8.discover_files import expand_paths
from flake8.options.parse_args import parse_args
from flake8.plugins.finder import Checkers
from flake8.plugins.finder import LoadedPlugin
from flake8.style_guide import StyleGuideManager
@ -41,6 +44,41 @@ SERIAL_RETRY_ERRNOS = {
# noise in diffs.
}
_mp_plugins: Checkers
_mp_options: argparse.Namespace
@contextlib.contextmanager
def _mp_prefork(
plugins: Checkers, options: argparse.Namespace
) -> Generator[None, None, None]:
# we can save significant startup work w/ `fork` multiprocessing
global _mp_plugins, _mp_options
_mp_plugins, _mp_options = plugins, options
try:
yield
finally:
del _mp_plugins, _mp_options
def _mp_init(argv: Sequence[str]) -> None:
global _mp_plugins, _mp_options
# Ensure correct signaling of ^C using multiprocessing.Pool.
signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
_mp_plugins, _mp_options # for `fork` this'll already be set
except NameError:
plugins, options = parse_args(argv)
_mp_plugins, _mp_options = plugins.checkers, options
def _mp_run(filename: str) -> tuple[str, Results, dict[str, int]]:
return FileChecker(
filename=filename, plugins=_mp_plugins, options=_mp_options
).run_checks()
class Manager:
"""Manage the parallelism and checker instances for each plugin and file.
@ -65,14 +103,13 @@ class Manager:
self,
style_guide: StyleGuideManager,
plugins: Checkers,
argv: Sequence[str],
) -> None:
"""Initialize our Manager instance."""
self.style_guide = style_guide
self.options = style_guide.options
self.plugins = plugins
self.jobs = self._job_count()
self._all_checkers: list[FileChecker] = []
self.checkers: list[FileChecker] = []
self.statistics = {
"files": 0,
"logical lines": 0,
@ -80,30 +117,22 @@ class Manager:
"tokens": 0,
}
self.exclude = (*self.options.exclude, *self.options.extend_exclude)
self.argv = argv
self.results: list[tuple[str, Results, dict[str, int]]] = []
def _process_statistics(self) -> None:
for checker in self.checkers:
for _, _, statistics in self.results:
for statistic in defaults.STATISTIC_NAMES:
self.statistics[statistic] += checker.statistics[statistic]
self.statistics["files"] += len(self.checkers)
self.statistics[statistic] += statistics[statistic]
self.statistics["files"] += len(self.filenames)
def _job_count(self) -> int:
# First we walk through all of our error cases:
# - multiprocessing library is not present
# - we're running on windows in which case we know we have significant
# implementation issues
# - the user provided stdin and that's not something we can handle
# well
# - the user provided some awful input
# class state is only preserved when using the `fork` strategy.
if multiprocessing.get_start_method() != "fork":
LOG.warning(
"The multiprocessing module is not available. "
"Ignoring --jobs arguments."
)
return 0
if utils.is_using_stdin(self.options.filenames):
LOG.warning(
"The --jobs option is not compatible with supplying "
@ -141,27 +170,6 @@ class Manager:
)
return reported_results_count
def make_checkers(self, paths: list[str] | None = None) -> None:
"""Create checkers for each file."""
if paths is None:
paths = self.options.filenames
self._all_checkers = [
FileChecker(
filename=filename,
plugins=self.plugins,
options=self.options,
)
for filename in expand_paths(
paths=paths,
stdin_display_name=self.options.stdin_display_name,
filename_patterns=self.options.filename,
exclude=self.exclude,
)
]
self.checkers = [c for c in self._all_checkers if c.should_process]
LOG.info("Checking %d files", len(self.checkers))
def report(self) -> tuple[int, int]:
"""Report all of the errors found in the managed file checkers.
@ -172,9 +180,8 @@ class Manager:
A tuple of the total results found and the results reported.
"""
results_reported = results_found = 0
for checker in self._all_checkers:
results = sorted(checker.results, key=lambda tup: (tup[1], tup[2]))
filename = checker.display_name
for filename, results, _ in self.results:
results.sort(key=lambda tup: (tup[1], tup[2]))
with self.style_guide.processing_file(filename):
results_reported += self._handle_results(filename, results)
results_found += len(results)
@ -182,12 +189,8 @@ class Manager:
def run_parallel(self) -> None:
"""Run the checkers in parallel."""
# fmt: off
final_results: dict[str, list[tuple[str, int, int, str, str | None]]] = collections.defaultdict(list) # noqa: E501
final_statistics: dict[str, dict[str, int]] = collections.defaultdict(dict) # noqa: E501
# fmt: on
pool = _try_initialize_processpool(self.jobs)
with _mp_prefork(self.plugins, self.options):
pool = _try_initialize_processpool(self.jobs, self.argv)
if pool is None:
self.run_serial()
@ -195,17 +198,7 @@ class Manager:
pool_closed = False
try:
pool_map = pool.imap_unordered(
_run_checks,
self.checkers,
chunksize=calculate_pool_chunksize(
len(self.checkers), self.jobs
),
)
for ret in pool_map:
filename, results, statistics = ret
final_results[filename] = results
final_statistics[filename] = statistics
self.results = list(pool.imap_unordered(_mp_run, self.filenames))
pool.close()
pool.join()
pool_closed = True
@ -214,15 +207,16 @@ class Manager:
pool.terminate()
pool.join()
for checker in self.checkers:
filename = checker.display_name
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]
def run_serial(self) -> None:
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
self.results = [
FileChecker(
filename=filename,
plugins=self.plugins,
options=self.options,
).run_checks()
for filename in self.filenames
]
def run(self) -> None:
"""Run all the checkers.
@ -234,7 +228,7 @@ class Manager:
:issue:`117`) this also implements fallback to serial processing.
"""
try:
if self.jobs > 1 and len(self.checkers) > 1:
if self.jobs > 1 and len(self.filenames) > 1:
self.run_parallel()
else:
self.run_serial()
@ -242,7 +236,7 @@ class Manager:
LOG.warning("Flake8 was interrupted by the user")
raise exceptions.EarlyQuit("Early quit while running checks")
def start(self, paths: list[str] | None = None) -> None:
def start(self) -> None:
"""Start checking files.
:param paths:
@ -250,7 +244,14 @@ class Manager:
:meth:`~Manager.make_checkers`.
"""
LOG.info("Making checkers")
self.make_checkers(paths)
self.filenames = tuple(
expand_paths(
paths=self.options.filenames,
stdin_display_name=self.options.stdin_display_name,
filename_patterns=self.options.filename,
exclude=self.exclude,
)
)
def stop(self) -> None:
"""Stop checking files."""
@ -325,7 +326,7 @@ class FileChecker:
def run_check(self, plugin: LoadedPlugin, **arguments: Any) -> Any:
"""Run the check in a single plugin."""
assert self.processor is not None
assert self.processor is not None, self.filename
try:
params = self.processor.keyword_arguments_for(
plugin.parameters, arguments
@ -409,7 +410,7 @@ class FileChecker:
def run_ast_checks(self) -> None:
"""Run all checks expecting an abstract syntax tree."""
assert self.processor is not None
assert self.processor is not None, self.filename
ast = self.processor.build_ast()
for plugin in self.plugins.tree:
@ -514,7 +515,9 @@ class FileChecker:
def run_checks(self) -> tuple[str, Results, dict[str, int]]:
"""Run checks against the file."""
assert self.processor is not None
if self.processor is None or not self.should_process:
return self.display_name, self.results, self.statistics
try:
self.run_ast_checks()
self.process_tokens()
@ -522,11 +525,11 @@ class FileChecker:
code = "E902" if isinstance(e, tokenize.TokenError) else "E999"
row, column = self._extract_syntax_information(e)
self.report(code, row, column, f"{type(e).__name__}: {e.args[0]}")
return self.filename, self.results, self.statistics
return self.display_name, self.results, self.statistics
logical_lines = self.processor.statistics["logical lines"]
self.statistics["logical lines"] = logical_lines
return self.filename, self.results, self.statistics
return self.display_name, self.results, self.statistics
def handle_newline(self, token_type: int) -> None:
"""Handle the logic when encountering a newline token."""
@ -573,17 +576,13 @@ class FileChecker:
self.run_physical_checks(line)
def _pool_init() -> None:
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _try_initialize_processpool(
job_count: int,
argv: Sequence[str],
) -> multiprocessing.pool.Pool | None:
"""Return a new process pool instance if we are able to create one."""
try:
return multiprocessing.Pool(job_count, _pool_init)
return multiprocessing.Pool(job_count, _mp_init, initargs=(argv,))
except OSError as err:
if err.errno not in SERIAL_RETRY_ERRNOS:
raise
@ -593,22 +592,6 @@ def _try_initialize_processpool(
return None
def calculate_pool_chunksize(num_checkers: int, num_jobs: int) -> int:
"""Determine the chunksize for the multiprocessing Pool.
- For chunksize, see: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap # noqa
- This formula, while not perfect, aims to give each worker two batches of
work.
- See: https://github.com/pycqa/flake8/issues/829#note_18878876
- See: https://github.com/pycqa/flake8/issues/197
"""
return max(num_checkers // (num_jobs * 2), 1)
def _run_checks(checker: FileChecker) -> tuple[str, Results, dict[str, int]]:
return checker.run_checks()
def find_offset(
offset: int, mapping: processor._LogicalMapping
) -> tuple[int, int]:

View file

@ -2,7 +2,6 @@
from __future__ import annotations
import argparse
import configparser
import json
import logging
import time
@ -15,10 +14,7 @@ from flake8 import exceptions
from flake8 import style_guide
from flake8.formatting.base import BaseFormatter
from flake8.main import debug
from flake8.main import options
from flake8.options import aggregator
from flake8.options import config
from flake8.options import manager
from flake8.options.parse_args import parse_args
from flake8.plugins import finder
from flake8.plugins import reporter
@ -35,12 +31,6 @@ class Application:
self.start_time = time.time()
#: The timestamp when the Application finished reported errors.
self.end_time: float | None = None
#: The prelimary argument parser for handling options required for
#: obtaining and parsing the configuration file.
self.prelim_arg_parser = options.stage1_arg_parser()
#: The instance of :class:`flake8.options.manager.OptionManager` used
#: to parse and handle the options and arguments passed by the user
self.option_manager: manager.OptionManager | None = None
self.plugins: finder.Plugins | None = None
#: The user-selected formatter from :attr:`formatting_plugins`
@ -65,30 +55,6 @@ class Application:
#: with a non-zero status code
self.catastrophic_failure = False
def parse_preliminary_options(
self, argv: Sequence[str]
) -> tuple[argparse.Namespace, list[str]]:
"""Get preliminary options from the CLI, pre-plugin-loading.
We need to know the values of a few standard options so that we can
locate configuration files and configure logging.
Since plugins aren't loaded yet, there may be some as-yet-unknown
options; we ignore those for now, they'll be parsed later when we do
real option parsing.
:param argv:
Command-line arguments passed in directly.
:returns:
Populated namespace and list of remaining argument strings.
"""
args, rest = self.prelim_arg_parser.parse_known_args(argv)
# XXX (ericvw): Special case "forwarding" the output file option so
# that it can be reparsed again for the BaseFormatter.filename.
if args.output_file:
rest.extend(("--output-file", args.output_file))
return args, rest
def exit_code(self) -> int:
"""Return the program exit code."""
if self.catastrophic_failure:
@ -99,76 +65,6 @@ class Application:
else:
return int(self.result_count > 0)
def find_plugins(
self,
cfg: configparser.RawConfigParser,
cfg_dir: str,
*,
enable_extensions: str | None,
require_plugins: str | None,
) -> None:
"""Find and load the plugins for this application.
Set :attr:`plugins` based on loaded plugins.
"""
opts = finder.parse_plugin_options(
cfg,
cfg_dir,
enable_extensions=enable_extensions,
require_plugins=require_plugins,
)
raw = finder.find_plugins(cfg, opts)
self.plugins = finder.load_plugins(raw, opts)
def register_plugin_options(self) -> None:
"""Register options provided by plugins to our option manager."""
assert self.plugins is not None
self.option_manager = manager.OptionManager(
version=flake8.__version__,
plugin_versions=self.plugins.versions_str(),
parents=[self.prelim_arg_parser],
formatter_names=list(self.plugins.reporters),
)
options.register_default_options(self.option_manager)
self.option_manager.register_plugins(self.plugins)
def parse_configuration_and_cli(
self,
cfg: configparser.RawConfigParser,
cfg_dir: str,
argv: list[str],
) -> None:
"""Parse configuration files and the CLI options."""
assert self.option_manager is not None
assert self.plugins is not None
self.options = aggregator.aggregate_options(
self.option_manager,
cfg,
cfg_dir,
argv,
)
if self.options.bug_report:
info = debug.information(flake8.__version__, self.plugins)
print(json.dumps(info, indent=2, sort_keys=True))
raise SystemExit(0)
for loaded in self.plugins.all_plugins():
parse_options = getattr(loaded.obj, "parse_options", None)
if parse_options is None:
continue
# XXX: ideally we wouldn't have two forms of parse_options
try:
parse_options(
self.option_manager,
self.options,
self.options.filenames,
)
except TypeError:
parse_options(self.options)
def make_formatter(self) -> None:
"""Initialize a formatter based on the parsed options."""
assert self.plugins is not None
@ -183,13 +79,14 @@ class Application:
self.options, self.formatter
)
def make_file_checker_manager(self) -> None:
def make_file_checker_manager(self, argv: Sequence[str]) -> None:
"""Initialize our FileChecker Manager."""
assert self.guide is not None
assert self.plugins is not None
self.file_checker_manager = checker.Manager(
style_guide=self.guide,
plugins=self.plugins.checkers,
argv=argv,
)
def run_checks(self) -> None:
@ -265,28 +162,16 @@ class Application:
This finds the plugins, registers their options, and parses the
command-line arguments.
"""
# NOTE(sigmavirus24): When updating this, make sure you also update
# our legacy API calls to these same methods.
prelim_opts, remaining_args = self.parse_preliminary_options(argv)
flake8.configure_logging(prelim_opts.verbose, prelim_opts.output_file)
self.plugins, self.options = parse_args(argv)
cfg, cfg_dir = config.load_config(
config=prelim_opts.config,
extra=prelim_opts.append_config,
isolated=prelim_opts.isolated,
)
if self.options.bug_report:
info = debug.information(flake8.__version__, self.plugins)
print(json.dumps(info, indent=2, sort_keys=True))
raise SystemExit(0)
self.find_plugins(
cfg,
cfg_dir,
enable_extensions=prelim_opts.enable_extensions,
require_plugins=prelim_opts.require_plugins,
)
self.register_plugin_options()
self.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
self.make_formatter()
self.make_guide()
self.make_file_checker_manager()
self.make_file_checker_manager(argv)
def report(self) -> None:
"""Report errors, statistics, and benchmarks."""

View file

@ -0,0 +1,70 @@
"""Procedure for parsing args, config, loading plugins."""
from __future__ import annotations
import argparse
from typing import Sequence
import flake8
from flake8.main import options
from flake8.options import aggregator
from flake8.options import config
from flake8.options import manager
from flake8.plugins import finder
def parse_args(
argv: Sequence[str],
) -> tuple[finder.Plugins, argparse.Namespace]:
"""Procedure for parsing args, config, loading plugins."""
prelim_parser = options.stage1_arg_parser()
args0, rest = prelim_parser.parse_known_args(argv)
# XXX (ericvw): Special case "forwarding" the output file option so
# that it can be reparsed again for the BaseFormatter.filename.
if args0.output_file:
rest.extend(("--output-file", args0.output_file))
flake8.configure_logging(args0.verbose, args0.output_file)
cfg, cfg_dir = config.load_config(
config=args0.config,
extra=args0.append_config,
isolated=args0.isolated,
)
plugin_opts = finder.parse_plugin_options(
cfg,
cfg_dir,
enable_extensions=args0.enable_extensions,
require_plugins=args0.require_plugins,
)
raw_plugins = finder.find_plugins(cfg, plugin_opts)
plugins = finder.load_plugins(raw_plugins, plugin_opts)
option_manager = manager.OptionManager(
version=flake8.__version__,
plugin_versions=plugins.versions_str(),
parents=[prelim_parser],
formatter_names=list(plugins.reporters),
)
options.register_default_options(option_manager)
option_manager.register_plugins(plugins)
opts = aggregator.aggregate_options(option_manager, cfg, cfg_dir, rest)
for loaded in plugins.all_plugins():
parse_options = getattr(loaded.obj, "parse_options", None)
if parse_options is None:
continue
# XXX: ideally we wouldn't have two forms of parse_options
try:
parse_options(
option_manager,
opts,
opts.filenames,
)
except TypeError:
parse_options(opts)
return plugins, opts

View file

@ -266,17 +266,12 @@ def test_report_order(results, expected_order):
# tuples to create the expected result lists from the indexes
expected_results = [results[index] for index in expected_order]
file_checker = mock.Mock(spec=["results", "display_name"])
file_checker.results = results
file_checker.display_name = "placeholder"
style_guide = mock.MagicMock(spec=["options", "processing_file"])
# Create a placeholder manager without arguments or plugins
# Just add one custom file checker which just provides the results
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
manager.checkers = manager._all_checkers = [file_checker]
manager = checker.Manager(style_guide, finder.Checkers([], [], []), [])
manager.results = [("placeholder", results, {})]
# _handle_results is the first place which gets the sorted result
# Should something non-private be mocked instead?
handler = mock.Mock(side_effect=count_side_effect)
@ -295,9 +290,9 @@ def test_acquire_when_multiprocessing_pool_can_initialize():
This simulates the behaviour on most common platforms.
"""
with mock.patch("multiprocessing.Pool") as pool:
result = checker._try_initialize_processpool(2)
result = checker._try_initialize_processpool(2, [])
pool.assert_called_once_with(2, checker._pool_init)
pool.assert_called_once_with(2, checker._mp_init, initargs=([],))
assert result is pool.return_value
@ -314,9 +309,9 @@ def test_acquire_when_multiprocessing_pool_can_not_initialize():
https://github.com/python/cpython/blob/4e02981de0952f54bf87967f8e10d169d6946b40/Lib/multiprocessing/synchronize.py#L30-L33
"""
with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool:
result = checker._try_initialize_processpool(2)
result = checker._try_initialize_processpool(2, [])
pool.assert_called_once_with(2, checker._pool_init)
pool.assert_called_once_with(2, checker._mp_init, initargs=([],))
assert result is None

View file

@ -20,9 +20,9 @@ def style_guide_mock():
def _parallel_checker_manager():
"""Call Manager.run() and return the number of calls to `run_serial`."""
style_guide = style_guide_mock()
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
# multiple checkers is needed for parallel mode
manager.checkers = [mock.Mock(), mock.Mock()]
manager = checker.Manager(style_guide, finder.Checkers([], [], []), [])
# multiple files is needed for parallel mode
manager.filenames = ("file1", "file2")
return manager
@ -36,8 +36,7 @@ def test_oserrors_cause_serial_fall_back():
assert serial.call_count == 1
@mock.patch.object(multiprocessing, "get_start_method", return_value="fork")
def test_oserrors_are_reraised(_):
def test_oserrors_are_reraised():
"""Verify that unexpected OSErrors will cause the Manager to reraise."""
err = OSError(errno.EAGAIN, "Ominous message")
with mock.patch("_multiprocessing.SemLock", side_effect=err):
@ -48,14 +47,6 @@ def test_oserrors_are_reraised(_):
assert serial.call_count == 0
@mock.patch.object(multiprocessing, "get_start_method", return_value="spawn")
def test_multiprocessing_is_disabled(_):
"""Verify not being able to import multiprocessing forces jobs to 0."""
style_guide = style_guide_mock()
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
assert manager.jobs == 0
def test_multiprocessing_cpu_count_not_implemented():
"""Verify that jobs is 0 if cpu_count is unavailable."""
style_guide = style_guide_mock()
@ -66,22 +57,18 @@ def test_multiprocessing_cpu_count_not_implemented():
"cpu_count",
side_effect=NotImplementedError,
):
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
manager = checker.Manager(style_guide, finder.Checkers([], [], []), [])
assert manager.jobs == 0
@mock.patch.object(multiprocessing, "get_start_method", return_value="spawn")
def test_make_checkers(_):
def test_make_checkers():
"""Verify that we create a list of FileChecker instances."""
style_guide = style_guide_mock()
style_guide.options.filenames = ["file1", "file2"]
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
manager = checker.Manager(style_guide, finder.Checkers([], [], []), [])
with mock.patch("flake8.utils.fnmatch", return_value=True):
with mock.patch("flake8.processor.FileProcessor"):
manager.make_checkers(["file1", "file2"])
manager.start()
assert manager._all_checkers
for file_checker in manager._all_checkers:
assert file_checker.filename in style_guide.options.filenames
assert not manager.checkers # the files don't exist
assert manager.filenames == ("file1", "file2")

View file

@ -1,57 +1,12 @@
"""Tests for Flake8's legacy API."""
from __future__ import annotations
import argparse
import configparser
import os.path
from unittest import mock
import pytest
from flake8.api import legacy as api
from flake8.formatting import base as formatter
from flake8.options import config
def test_get_style_guide():
"""Verify the methods called on our internal Application."""
prelim_opts = argparse.Namespace(
append_config=[],
config=None,
isolated=False,
output_file=None,
verbose=0,
enable_extensions=None,
require_plugins=None,
)
mockedapp = mock.Mock()
mockedapp.parse_preliminary_options.return_value = (prelim_opts, [])
mockedapp.program = "flake8"
cfg = configparser.RawConfigParser()
cfg_dir = os.getcwd()
with mock.patch.object(config, "load_config", return_value=(cfg, cfg_dir)):
with mock.patch("flake8.main.application.Application") as application:
application.return_value = mockedapp
style_guide = api.get_style_guide()
application.assert_called_once_with()
mockedapp.parse_preliminary_options.assert_called_once_with([])
mockedapp.find_plugins.assert_called_once_with(
cfg,
cfg_dir,
enable_extensions=None,
require_plugins=None,
)
mockedapp.register_plugin_options.assert_called_once_with()
mockedapp.parse_configuration_and_cli.assert_called_once_with(
cfg, cfg_dir, []
)
mockedapp.make_formatter.assert_called_once_with()
mockedapp.make_guide.assert_called_once_with()
mockedapp.make_file_checker_manager.assert_called_once_with()
assert isinstance(style_guide, api.StyleGuide)
def test_styleguide_options():