From 50d69150c1073ca5e5144a9ef05bfab25ffb3c00 Mon Sep 17 00:00:00 2001 From: Anthony Sottile Date: Fri, 31 Dec 2021 13:11:00 -0800 Subject: [PATCH] rework plugin loading --- docs/source/internal/plugin_handling.rst | 74 +-- docs/source/internal/utils.rst | 17 - .../plugin-development/plugin-parameters.rst | 4 +- .../flake8_example_plugin/off_by_default.py | 3 - .../flake8_example_plugin/on_by_default.py | 3 - pytest.ini | 6 +- setup.cfg | 8 +- src/flake8/checker.py | 76 +-- src/flake8/main/application.py | 112 +--- src/flake8/main/debug.py | 28 +- src/flake8/options/manager.py | 39 +- src/flake8/plugins/finder.py | 256 ++++++++ src/flake8/plugins/manager.py | 530 --------------- src/flake8/plugins/pyflakes.py | 2 - src/flake8/plugins/reporter.py | 41 ++ src/flake8/processor.py | 7 +- src/flake8/utils.py | 39 -- tests/fixtures/config_files/README.rst | 22 - .../config_files/local-plugin-path.ini | 5 - tests/fixtures/config_files/local-plugin.ini | 5 - tests/integration/subdir/aplugin.py | 3 - tests/integration/test_checker.py | 76 +-- tests/integration/test_plugins.py | 90 ++- tests/unit/plugins/__init__.py | 0 tests/unit/plugins/finder_test.py | 610 ++++++++++++++++++ tests/unit/plugins/reporter_test.py | 74 +++ tests/unit/test_application.py | 61 -- tests/unit/test_checker_manager.py | 15 +- tests/unit/test_debug.py | 77 +-- tests/unit/test_file_checker.py | 39 +- tests/unit/test_main_options.py | 17 + tests/unit/test_option_manager.py | 11 - tests/unit/test_plugin.py | 169 ----- tests/unit/test_plugin_manager.py | 57 -- tests/unit/test_plugin_type_manager.py | 177 ----- tests/unit/test_utils.py | 29 - 36 files changed, 1277 insertions(+), 1505 deletions(-) create mode 100644 src/flake8/plugins/finder.py delete mode 100644 src/flake8/plugins/manager.py create mode 100644 src/flake8/plugins/reporter.py delete mode 100644 tests/fixtures/config_files/README.rst delete mode 100644 tests/fixtures/config_files/local-plugin-path.ini delete mode 100644 tests/fixtures/config_files/local-plugin.ini create mode 100644 tests/unit/plugins/__init__.py create mode 100644 tests/unit/plugins/finder_test.py create mode 100644 tests/unit/plugins/reporter_test.py create mode 100644 tests/unit/test_main_options.py delete mode 100644 tests/unit/test_plugin.py delete mode 100644 tests/unit/test_plugin_manager.py delete mode 100644 tests/unit/test_plugin_type_manager.py diff --git a/docs/source/internal/plugin_handling.rst b/docs/source/internal/plugin_handling.rst index faf3996..cdec601 100644 --- a/docs/source/internal/plugin_handling.rst +++ b/docs/source/internal/plugin_handling.rst @@ -11,44 +11,6 @@ new checks. It now supports: - alternative report formatters -To facilitate this, |Flake8| needed a more mature way of managing plugins. -Thus, we developed the |PluginManager| which accepts a namespace and will load -the plugins for that namespace. A |PluginManager| creates and manages many -|Plugin| instances. - -A |Plugin| lazily loads the underlying entry-point provided by setuptools. -The entry-point will be loaded either by calling -:meth:`~flake8.plugins.manager.Plugin.load_plugin` or accessing the ``plugin`` -attribute. We also use this abstraction to retrieve options that the plugin -wishes to register and parse. - -The only public method the |PluginManager| provides is -:meth:`~flake8.plugins.manager.PluginManager.map`. This will accept a function -(or other callable) and call it with each plugin as the first parameter. - -We build atop the |PluginManager| with the |PTM|. It is expected that users of -the |PTM| will subclass it and specify the ``namespace``, e.g., - -.. code-block:: python - - class ExamplePluginType(flake8.plugin.manager.PluginTypeManager): - namespace = 'example-plugins' - -This provides a few extra methods via the |PluginManager|'s ``map`` method. - -Finally, we create two classes of plugins: - -- :class:`~flake8.plugins.manager.Checkers` - -- :class:`~flake8.plugins.manager.ReportFormatters` - -These are used to interact with each of the types of plugins individually. - -.. note:: - - Our inspiration for our plugin handling comes from the author's extensive - experience with ``stevedore``. - Default Plugins --------------- @@ -56,40 +18,26 @@ Finally, |Flake8| has always provided its own plugin shim for Pyflakes. As part of that we carry our own shim in-tree and now store that in :mod:`flake8.plugins.pyflakes`. -|Flake8| also registers plugins for pep8. Each check in pep8 requires -different parameters and it cannot easily be shimmed together like Pyflakes -was. As such, plugins have a concept of a "group". If you look at our -:file:`setup.py` you will see that we register pep8 checks roughly like so: +|Flake8| also registers plugins for pycodestyle. Each check in pycodestyle +requires different parameters and it cannot easily be shimmed together like +Pyflakes was. As such, plugins have a concept of a "group". If you look at our +:file:`setup.py` you will see that we register pycodestyle checks roughly like +so: .. code:: - pep8. = pep8: + pycodestyle. = pycodestyle: We do this to identify that ``>`` is part of a group. This also enables us to special-case how we handle reporting those checks. Instead of -reporting each check in the ``--version`` output, we report ``pep8`` and check -``pep8`` the module for a ``__version__`` attribute. We only report it once -to avoid confusing users. +reporting each check in the ``--version`` output, we only report +``pycodestyle`` once. API Documentation ----------------- -.. autoclass:: flake8.plugins.manager.PluginManager - :members: - :special-members: __init__ +.. autofunction:: flake8.plugins.finder.find_plugins -.. autoclass:: flake8.plugins.manager.Plugin - :members: - :special-members: __init__ +.. autofunction:: flake8.plugins.finder.find_local_plugin_paths -.. autoclass:: flake8.plugins.manager.PluginTypeManager - :members: - -.. autoclass:: flake8.plugins.manager.Checkers - :members: - -.. autoclass:: flake8.plugins.manager.ReportFormatters - -.. |PluginManager| replace:: :class:`~flake8.plugins.manager.PluginManager` -.. |Plugin| replace:: :class:`~flake8.plugins.manager.Plugin` -.. |PTM| replace:: :class:`~flake8.plugins.manager.PluginTypeManager` +.. autofunction:: flake8.plugins.finder.load_plugins diff --git a/docs/source/internal/utils.rst b/docs/source/internal/utils.rst index 34b09f1..c745917 100644 --- a/docs/source/internal/utils.rst +++ b/docs/source/internal/utils.rst @@ -67,23 +67,6 @@ filename matches a single pattern. In our use case, however, we typically have a list of patterns and want to know if the filename matches any of them. This function abstracts that logic away with a little extra logic. -.. autofunction:: flake8.utils.parameters_for - -|Flake8| analyzes the parameters to plugins to determine what input they are -expecting. Plugins may expect one of the following: - -- ``physical_line`` to receive the line as it appears in the file - -- ``logical_line`` to receive the logical line (not as it appears in the file) - -- ``tree`` to receive the abstract syntax tree (AST) for the file - -We also analyze the rest of the parameters to provide more detail to the -plugin. This function will return the parameters in a consistent way across -versions of Python and will handle both classes and functions that are used as -plugins. Further, if the plugin is a class, it will strip the ``self`` -argument so we can check the parameters of the plugin consistently. - .. autofunction:: flake8.utils.parse_unified_diff To handle usage of :option:`flake8 --diff`, |Flake8| needs to be able diff --git a/docs/source/plugin-development/plugin-parameters.rst b/docs/source/plugin-development/plugin-parameters.rst index 6dae857..931c186 100644 --- a/docs/source/plugin-development/plugin-parameters.rst +++ b/docs/source/plugin-development/plugin-parameters.rst @@ -22,8 +22,8 @@ Indicating Desired Data ======================= |Flake8| inspects the plugin's signature to determine what parameters it -expects using :func:`flake8.utils.parameters_for`. -:attr:`flake8.plugins.manager.Plugin.parameters` caches the values so that +expects using :func:`flake8.plugins.finder._parameters_for`. +:attr:`flake8.plugins.finder.LoadedPlugin.parameters` caches the values so that each plugin makes that fairly expensive call once per plugin. When processing a file, a plugin can ask for any of the following: diff --git a/example-plugin/src/flake8_example_plugin/off_by_default.py b/example-plugin/src/flake8_example_plugin/off_by_default.py index 93dfb38..54737cb 100644 --- a/example-plugin/src/flake8_example_plugin/off_by_default.py +++ b/example-plugin/src/flake8_example_plugin/off_by_default.py @@ -4,9 +4,6 @@ class ExampleTwo: """Second Example Plugin.""" - name = "off-by-default-example-plugin" - version = "1.0.0" - off_by_default = True def __init__(self, tree): diff --git a/example-plugin/src/flake8_example_plugin/on_by_default.py b/example-plugin/src/flake8_example_plugin/on_by_default.py index d712718..a3e5332 100644 --- a/example-plugin/src/flake8_example_plugin/on_by_default.py +++ b/example-plugin/src/flake8_example_plugin/on_by_default.py @@ -4,9 +4,6 @@ class ExampleOne: """First Example Plugin.""" - name = "on-by-default-example-plugin" - version = "1.0.0" - def __init__(self, tree): self.tree = tree diff --git a/pytest.ini b/pytest.ini index 1978251..0301af3 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,4 @@ [pytest] -norecursedirs = .git .* *.egg* old docs dist build +norecursedirs = .git .* *.egg* docs dist build addopts = -rw -filterwarnings = - error - ignore:SelectableGroups:DeprecationWarning +filterwarnings = error diff --git a/setup.cfg b/setup.cfg index 47f8feb..9924505 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,7 +43,7 @@ install_requires = pycodestyle>=2.8.0,<2.9.0 pyflakes>=2.4.0,<2.5.0 importlib-metadata<4.3;python_version<"3.8" -python_requires = >=3.6 +python_requires = >=3.6.1 [options.packages.find] where = src @@ -109,12 +109,6 @@ warn_unused_ignores = true # TODO: fix these [mypy-flake8.api.legacy] disallow_untyped_defs = false -[mypy-flake8.checker] -disallow_untyped_defs = false -[mypy-flake8.main.application] -disallow_untyped_defs = false -[mypy-flake8.plugins.manager] -disallow_untyped_defs = false [mypy-tests.*] disallow_untyped_defs = false diff --git a/src/flake8/checker.py b/src/flake8/checker.py index 63fa439..a42523d 100644 --- a/src/flake8/checker.py +++ b/src/flake8/checker.py @@ -1,4 +1,5 @@ """Checker Manager and Checker classes.""" +import argparse import collections import errno import itertools @@ -6,6 +7,7 @@ import logging import multiprocessing.pool import signal import tokenize +from typing import Any from typing import Dict from typing import List from typing import Optional @@ -16,6 +18,9 @@ from flake8 import exceptions from flake8 import processor from flake8 import utils from flake8.discover_files import expand_paths +from flake8.plugins.finder import Checkers +from flake8.plugins.finder import LoadedPlugin +from flake8.style_guide import StyleGuideManager Results = List[Tuple[str, int, int, str, Optional[str]]] @@ -56,21 +61,15 @@ class Manager: together and make our output deterministic. """ - def __init__(self, style_guide, checker_plugins): - """Initialize our Manager instance. - - :param style_guide: - The instantiated style guide for this instance of Flake8. - :type style_guide: - flake8.style_guide.StyleGuide - :param checker_plugins: - The plugins representing checks parsed from entry-points. - :type checker_plugins: - flake8.plugins.manager.Checkers - """ + def __init__( + self, + style_guide: StyleGuideManager, + plugins: Checkers, + ) -> None: + """Initialize our Manager instance.""" self.style_guide = style_guide self.options = style_guide.options - self.checks = checker_plugins + self.plugins = plugins self.jobs = self._job_count() self._all_checkers: List[FileChecker] = [] self.checkers: List[FileChecker] = [] @@ -158,9 +157,12 @@ class Manager: if paths is None: paths = self.options.filenames - checks = self.checks.to_dictionary() self._all_checkers = [ - FileChecker(filename, checks, self.options) + FileChecker( + filename=filename, + plugins=self.plugins, + options=self.options, + ) for filename in expand_paths( paths=paths, stdin_display_name=self.options.stdin_display_name, @@ -273,23 +275,17 @@ class Manager: class FileChecker: """Manage running checks for a file and aggregate the results.""" - def __init__(self, filename, checks, options): - """Initialize our file checker. - - :param str filename: - Name of the file to check. - :param checks: - The plugins registered to check the file. - :type checks: - dict - :param options: - Parsed option values from config and command-line. - :type options: - argparse.Namespace - """ + def __init__( + self, + *, + filename: str, + plugins: Checkers, + options: argparse.Namespace, + ) -> None: + """Initialize our file checker.""" self.options = options self.filename = filename - self.checks = checks + self.plugins = plugins self.results: Results = [] self.statistics = { "tokens": 0, @@ -342,29 +338,27 @@ class FileChecker: self.results.append((error_code, line_number, column, text, line)) return error_code - def run_check(self, plugin, **arguments): + def run_check(self, plugin: LoadedPlugin, **arguments: Any) -> Any: """Run the check in a single plugin.""" LOG.debug("Running %r with %r", plugin, arguments) assert self.processor is not None try: - self.processor.keyword_arguments_for( - plugin["parameters"], arguments - ) + self.processor.keyword_arguments_for(plugin.parameters, arguments) except AttributeError as ae: LOG.error("Plugin requested unknown parameters.") raise exceptions.PluginRequestedUnknownParameters( - plugin_name=plugin["plugin_name"], exception=ae + plugin_name=plugin.plugin.package, exception=ae ) try: - return plugin["plugin"](**arguments) + return plugin.obj(**arguments) except Exception as all_exc: LOG.critical( "Plugin %s raised an unexpected exception", - plugin["name"], + plugin.display_name, exc_info=True, ) raise exceptions.PluginExecutionFailed( - plugin_name=plugin["plugin_name"], exception=all_exc + plugin_name=plugin.display_name, exception=all_exc ) @staticmethod @@ -431,7 +425,7 @@ class FileChecker: assert self.processor is not None ast = self.processor.build_ast() - for plugin in self.checks["ast_plugins"]: + for plugin in self.plugins.tree: checker = self.run_check(plugin, tree=ast) # If the plugin uses a class, call the run method of it, otherwise # the call should return something iterable itself @@ -457,7 +451,7 @@ class FileChecker: LOG.debug('Logical line: "%s"', logical_line.rstrip()) - for plugin in self.checks["logical_line_plugins"]: + for plugin in self.plugins.logical_line: self.processor.update_checker_state_for(plugin) results = self.run_check(plugin, logical_line=logical_line) or () for offset, text in results: @@ -479,7 +473,7 @@ class FileChecker: A single physical check may return multiple errors. """ assert self.processor is not None - for plugin in self.checks["physical_line_plugins"]: + for plugin in self.plugins.physical_line: self.processor.update_checker_state_for(plugin) result = self.run_check(plugin, physical_line=physical_line) diff --git a/src/flake8/main/application.py b/src/flake8/main/application.py index 1894881..86fd89f 100644 --- a/src/flake8/main/application.py +++ b/src/flake8/main/application.py @@ -3,7 +3,6 @@ import argparse import configparser import json import logging -import sys import time from typing import Dict from typing import List @@ -12,7 +11,6 @@ from typing import Sequence from typing import Set from typing import Tuple from typing import Type -from typing import TYPE_CHECKING import flake8 from flake8 import checker @@ -20,15 +18,14 @@ from flake8 import defaults from flake8 import exceptions from flake8 import style_guide from flake8 import utils +from flake8.formatting.base import BaseFormatter from flake8.main import debug from flake8.main import options from flake8.options import aggregator from flake8.options import config from flake8.options import manager -from flake8.plugins import manager as plugin_manager - -if TYPE_CHECKING: - from flake8.formatting.base import BaseFormatter +from flake8.plugins import finder +from flake8.plugins import reporter LOG = logging.getLogger(__name__) @@ -56,12 +53,7 @@ class Application: #: to parse and handle the options and arguments passed by the user self.option_manager: Optional[manager.OptionManager] = None - #: The instance of :class:`flake8.plugins.manager.Checkers` - self.check_plugins: Optional[plugin_manager.Checkers] = None - #: The instance of :class:`flake8.plugins.manager.ReportFormatters` - self.formatting_plugins: Optional[ - plugin_manager.ReportFormatters - ] = None + self.plugins: Optional[finder.Plugins] = None #: The user-selected formatter from :attr:`formatting_plugins` self.formatter: Optional[BaseFormatter] = None #: The :class:`flake8.style_guide.StyleGuideManager` built from the @@ -130,49 +122,23 @@ class Application: ) -> None: """Find and load the plugins for this application. - Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes - based on the discovered plugins found. + Set :attr:`plugins` based on loaded plugins. """ - # TODO: move to src/flake8/plugins/finder.py - extension_local = utils.parse_comma_separated_list( - cfg.get("flake8:local-plugins", "extension", fallback="").strip(), - regexp=utils.LOCAL_PLUGIN_LIST_RE, - ) - report_local = utils.parse_comma_separated_list( - cfg.get("flake8:local-plugins", "report", fallback="").strip(), - regexp=utils.LOCAL_PLUGIN_LIST_RE, - ) - - paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip() - local_paths = utils.parse_comma_separated_list(paths_s) - local_paths = utils.normalize_paths(local_paths, cfg_dir) - - sys.path.extend(local_paths) - - self.check_plugins = plugin_manager.Checkers(extension_local) - - self.formatting_plugins = plugin_manager.ReportFormatters(report_local) - - self.check_plugins.load_plugins() - self.formatting_plugins.load_plugins() + raw_plugins = finder.find_plugins(cfg) + local_plugin_paths = finder.find_local_plugin_paths(cfg, cfg_dir) + self.plugins = finder.load_plugins(raw_plugins, local_plugin_paths) def register_plugin_options(self) -> None: """Register options provided by plugins to our option manager.""" - assert self.check_plugins is not None - assert self.formatting_plugins is not None + assert self.plugins is not None - versions = sorted(set(self.check_plugins.manager.versions())) self.option_manager = manager.OptionManager( version=flake8.__version__, - plugin_versions=", ".join( - f"{name}: {version}" for name, version in versions - ), + plugin_versions=self.plugins.versions_str(), parents=[self.prelim_arg_parser], ) options.register_default_options(self.option_manager) - - self.check_plugins.register_options(self.option_manager) - self.formatting_plugins.register_options(self.option_manager) + self.option_manager.register_plugins(self.plugins) def parse_configuration_and_cli( self, @@ -182,6 +148,7 @@ class Application: ) -> None: """Parse configuration files and the CLI options.""" assert self.option_manager is not None + assert self.plugins is not None self.options = aggregator.aggregate_options( self.option_manager, cfg, @@ -190,8 +157,7 @@ class Application: ) if self.options.bug_report: - assert self.check_plugins is not None - info = debug.information(flake8.__version__, self.check_plugins) + info = debug.information(flake8.__version__, self.plugins) print(json.dumps(info, indent=2, sort_keys=True)) raise SystemExit(0) @@ -202,44 +168,28 @@ class Application: ) self.parsed_diff = utils.parse_unified_diff() - assert self.check_plugins is not None - self.check_plugins.provide_options( - self.option_manager, self.options, self.options.filenames - ) - assert self.formatting_plugins is not None - self.formatting_plugins.provide_options( - self.option_manager, self.options, self.options.filenames - ) + for loaded in self.plugins.all_plugins(): + parse_options = getattr(loaded.obj, "parse_options", None) + if parse_options is None: + continue - def formatter_for(self, formatter_plugin_name): - """Retrieve the formatter class by plugin name.""" - assert self.formatting_plugins is not None - default_formatter = self.formatting_plugins["default"] - formatter_plugin = self.formatting_plugins.get(formatter_plugin_name) - if formatter_plugin is None: - LOG.warning( - '"%s" is an unknown formatter. Falling back to default.', - formatter_plugin_name, - ) - formatter_plugin = default_formatter - - return formatter_plugin.execute + # XXX: ideally we would't have two forms of parse_options + try: + parse_options( + self.option_manager, + self.options, + self.options.filenames, + ) + except TypeError: + parse_options(self.options) def make_formatter( - self, formatter_class: Optional[Type["BaseFormatter"]] = None + self, formatter_class: Optional[Type[BaseFormatter]] = None ) -> None: """Initialize a formatter based on the parsed options.""" + assert self.plugins is not None assert self.options is not None - format_plugin = self.options.format - if 1 <= self.options.quiet < 2: - format_plugin = "quiet-filename" - elif 2 <= self.options.quiet: - format_plugin = "quiet-nothing" - - if formatter_class is None: - formatter_class = self.formatter_for(format_plugin) - - self.formatter = formatter_class(self.options) + self.formatter = reporter.make(self.plugins.reporters, self.options) def make_guide(self) -> None: """Initialize our StyleGuide.""" @@ -254,9 +204,11 @@ class Application: def make_file_checker_manager(self) -> None: """Initialize our FileChecker Manager.""" + assert self.guide is not None + assert self.plugins is not None self.file_checker_manager = checker.Manager( style_guide=self.guide, - checker_plugins=self.check_plugins, + plugins=self.plugins.checkers, ) def run_checks(self) -> None: diff --git a/src/flake8/main/debug.py b/src/flake8/main/debug.py index 4cd9024..03671bc 100644 --- a/src/flake8/main/debug.py +++ b/src/flake8/main/debug.py @@ -2,30 +2,28 @@ import platform from typing import Any from typing import Dict -from typing import List -from flake8.plugins.manager import PluginTypeManager +from flake8.plugins.finder import Plugins -def information( - version: str, - plugins: PluginTypeManager, -) -> Dict[str, Any]: +def information(version: str, plugins: Plugins) -> Dict[str, Any]: """Generate the information to be printed for the bug report.""" + versions = sorted( + { + (loaded.plugin.package, loaded.plugin.version) + for loaded in plugins.all_plugins() + if loaded.plugin.package not in {"flake8", "local"} + } + ) return { "version": version, - "plugins": plugins_from(plugins), + "plugins": [ + {"plugin": plugin, "version": version} + for plugin, version in versions + ], "platform": { "python_implementation": platform.python_implementation(), "python_version": platform.python_version(), "system": platform.system(), }, } - - -def plugins_from(plugins: PluginTypeManager) -> List[Dict[str, str]]: - """Generate the list of plugins installed.""" - return [ - {"plugin": name, "version": version} - for name, version in sorted(set(plugins.manager.versions())) - ] diff --git a/src/flake8/options/manager.py b/src/flake8/options/manager.py index 5d48ba9..d448e1b 100644 --- a/src/flake8/options/manager.py +++ b/src/flake8/options/manager.py @@ -1,13 +1,11 @@ """Option handling and Option management logic.""" import argparse -import contextlib import enum import functools import logging from typing import Any from typing import Callable from typing import Dict -from typing import Generator from typing import List from typing import Mapping from typing import Optional @@ -18,6 +16,7 @@ from typing import Type from typing import Union from flake8 import utils +from flake8.plugins.finder import Plugins LOG = logging.getLogger(__name__) @@ -351,6 +350,7 @@ class OptionManager: ), ) self.parser.add_argument("filenames", nargs="*", metavar="filename") + self.config_options_dict: Dict[str, Option] = {} self.options: List[Option] = [] self.extended_default_ignore: Set[str] = set() @@ -358,15 +358,32 @@ class OptionManager: self._current_group: Optional[argparse._ArgumentGroup] = None - @contextlib.contextmanager - def group(self, name: str) -> Generator[None, None, None]: - """Attach options to an argparse group during this context.""" - group = self.parser.add_argument_group(name) - self._current_group, orig_group = group, self._current_group - try: - yield - finally: - self._current_group = orig_group + # TODO: maybe make this a free function to reduce api surface area + def register_plugins(self, plugins: Plugins) -> None: + """Register the plugin options (if needed).""" + groups: Dict[str, argparse._ArgumentGroup] = {} + + def _set_group(name: str) -> None: + try: + self._current_group = groups[name] + except KeyError: + group = self.parser.add_argument_group(name) + self._current_group = groups[name] = group + + for loaded in plugins.all_plugins(): + add_options = getattr(loaded.obj, "add_options", None) + if add_options: + _set_group(loaded.plugin.package) + add_options(self) + + # if the plugin is off by default, disable it! + if getattr(loaded.obj, "off_by_default", False): + self.extend_default_ignore(loaded.entry_name) + else: + self.extend_default_select(loaded.entry_name) + + # isn't strictly necessary, but seems cleaner + self._current_group = None def add_option(self, *args: Any, **kwargs: Any) -> None: """Create and register a new option. diff --git a/src/flake8/plugins/finder.py b/src/flake8/plugins/finder.py new file mode 100644 index 0000000..4609474 --- /dev/null +++ b/src/flake8/plugins/finder.py @@ -0,0 +1,256 @@ +"""Functions related to finding and loading plugins.""" +import configparser +import inspect +import logging +import sys +from typing import Any +from typing import Dict +from typing import Generator +from typing import Iterable +from typing import List +from typing import NamedTuple + +from flake8 import utils +from flake8._compat import importlib_metadata +from flake8.exceptions import FailedToLoadPlugin + +LOG = logging.getLogger(__name__) + +FLAKE8_GROUPS = frozenset(("flake8.extension", "flake8.report")) + +BANNED_PLUGINS = { + "flake8-colors": "4.1", + "flake8-per-file-ignores": "3.7", +} + + +class Plugin(NamedTuple): + """A plugin before loading.""" + + package: str + version: str + entry_point: importlib_metadata.EntryPoint + + +class LoadedPlugin(NamedTuple): + """Represents a plugin after being imported.""" + + plugin: Plugin + obj: Any + parameters: Dict[str, bool] + + @property + def entry_name(self) -> str: + """Return the name given in the packaging metadata.""" + return self.plugin.entry_point.name + + @property + def display_name(self) -> str: + """Return the name for use in user-facing / error messages.""" + return f"{self.plugin.package}[{self.entry_name}]" + + +class Checkers(NamedTuple): + """Classified plugins needed for checking.""" + + tree: List[LoadedPlugin] + logical_line: List[LoadedPlugin] + physical_line: List[LoadedPlugin] + + +class Plugins(NamedTuple): + """Classified plugins.""" + + checkers: Checkers + reporters: Dict[str, LoadedPlugin] + + def all_plugins(self) -> Generator[LoadedPlugin, None, None]: + """Return an iterator over all :class:`LoadedPlugin`s.""" + yield from self.checkers.tree + yield from self.checkers.logical_line + yield from self.checkers.physical_line + yield from self.reporters.values() + + def versions_str(self) -> str: + """Return a user-displayed list of plugin versions.""" + return ", ".join( + sorted( + { + f"{loaded.plugin.package}: {loaded.plugin.version}" + for loaded in self.all_plugins() + if loaded.plugin.package not in {"flake8", "local"} + } + ) + ) + + +def _flake8_plugins( + eps: Iterable[importlib_metadata.EntryPoint], + name: str, + version: str, +) -> Generator[Plugin, None, None]: + pyflakes_meta = importlib_metadata.distribution("pyflakes").metadata + pycodestyle_meta = importlib_metadata.distribution("pycodestyle").metadata + + for ep in eps: + if ep.group not in FLAKE8_GROUPS: + continue + + if ep.name == "F": + yield Plugin(pyflakes_meta["name"], pyflakes_meta["version"], ep) + elif ep.name.startswith("pycodestyle"): + yield Plugin( + pycodestyle_meta["name"], pycodestyle_meta["version"], ep + ) + else: + yield Plugin(name, version, ep) + + +def _find_importlib_plugins() -> Generator[Plugin, None, None]: + for dist in importlib_metadata.distributions(): + # assigned to prevent continual reparsing + eps = dist.entry_points + + # perf: skip parsing `.metadata` (slow) if no entry points match + if not any(ep.group in FLAKE8_GROUPS for ep in eps): + continue + + # assigned to prevent continual reparsing + meta = dist.metadata + + if meta["name"] in BANNED_PLUGINS: + LOG.warning( + "%s plugin is obsolete in flake8>=%s", + meta["name"], + BANNED_PLUGINS[meta["name"]], + ) + continue + elif meta["name"] == "flake8": + # special case flake8 which provides plugins for pyflakes / + # pycodestyle + yield from _flake8_plugins(eps, meta["name"], meta["version"]) + continue + + for ep in eps: + if ep.group in FLAKE8_GROUPS: + yield Plugin(meta["name"], meta["version"], ep) + + +def _find_local_plugins( + cfg: configparser.RawConfigParser, +) -> Generator[Plugin, None, None]: + for plugin_type in ("extension", "report"): + group = f"flake8.{plugin_type}" + for plugin_s in utils.parse_comma_separated_list( + cfg.get("flake8:local-plugins", plugin_type, fallback="").strip(), + regexp=utils.LOCAL_PLUGIN_LIST_RE, + ): + name, _, entry_str = plugin_s.partition("=") + name, entry_str = name.strip(), entry_str.strip() + ep = importlib_metadata.EntryPoint(name, entry_str, group) + yield Plugin("local", "local", ep) + + +def find_plugins(cfg: configparser.RawConfigParser) -> List[Plugin]: + """Discovers all plugins (but does not load them).""" + ret = [*_find_importlib_plugins(), *_find_local_plugins(cfg)] + + # for determinism, sort the list + ret.sort() + + return ret + + +def find_local_plugin_paths( + cfg: configparser.RawConfigParser, + cfg_dir: str, +) -> List[str]: + """Discovers the list of ``flake8:local-plugins`` ``paths``.""" + paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip() + paths = utils.parse_comma_separated_list(paths_s) + return utils.normalize_paths(paths, cfg_dir) + + +def _parameters_for(func: Any) -> Dict[str, bool]: + """Return the parameters for the plugin. + + This will inspect the plugin and return either the function parameters + if the plugin is a function or the parameters for ``__init__`` after + ``self`` if the plugin is a class. + + :returns: + A dictionary mapping the parameter name to whether or not it is + required (a.k.a., is positional only/does not have a default). + """ + is_class = not inspect.isfunction(func) + if is_class: + func = func.__init__ + + parameters = { + parameter.name: parameter.default is inspect.Parameter.empty + for parameter in inspect.signature(func).parameters.values() + if parameter.kind is inspect.Parameter.POSITIONAL_OR_KEYWORD + } + + if is_class: + parameters.pop("self", None) + + return parameters + + +def _load_plugin(plugin: Plugin) -> LoadedPlugin: + try: + obj = plugin.entry_point.load() + except Exception as e: + raise FailedToLoadPlugin(plugin.package, e) + + if not callable(obj): + err = TypeError("expected loaded plugin to be callable") + raise FailedToLoadPlugin(plugin.package, err) + + return LoadedPlugin(plugin, obj, _parameters_for(obj)) + + +def _import_plugins( + plugins: List[Plugin], paths: List[str] +) -> List[LoadedPlugin]: + sys.path.extend(paths) + return [_load_plugin(p) for p in plugins] + + +def _classify_plugins(plugins: List[LoadedPlugin]) -> Plugins: + tree = [] + logical_line = [] + physical_line = [] + reporters = {} + + for loaded in plugins: + if loaded.plugin.entry_point.group == "flake8.report": + reporters[loaded.entry_name] = loaded + elif "tree" in loaded.parameters: + tree.append(loaded) + elif "logical_line" in loaded.parameters: + logical_line.append(loaded) + elif "physical_line" in loaded.parameters: + physical_line.append(loaded) + else: + raise NotImplementedError(f"what plugin type? {loaded}") + + return Plugins( + checkers=Checkers( + tree=tree, + logical_line=logical_line, + physical_line=physical_line, + ), + reporters=reporters, + ) + + +def load_plugins(plugins: List[Plugin], paths: List[str]) -> Plugins: + """Load and classify all flake8 plugins. + + - first: extends ``sys.path`` with ``paths`` (to import local plugins) + - next: converts the ``Plugin``s to ``LoadedPlugins`` + - finally: classifies plugins into their specific types + """ + return _classify_plugins(_import_plugins(plugins, paths)) diff --git a/src/flake8/plugins/manager.py b/src/flake8/plugins/manager.py deleted file mode 100644 index fe9b038..0000000 --- a/src/flake8/plugins/manager.py +++ /dev/null @@ -1,530 +0,0 @@ -"""Plugin loading and management logic and classes.""" -import logging -from typing import Any -from typing import Dict -from typing import List -from typing import Optional -from typing import Set - -from flake8 import exceptions -from flake8 import utils -from flake8._compat import importlib_metadata - -LOG = logging.getLogger(__name__) - -__all__ = ("Checkers", "Plugin", "PluginManager", "ReportFormatters") - -NO_GROUP_FOUND = object() - - -class Plugin: - """Wrap an EntryPoint from setuptools and other logic.""" - - def __init__(self, name, entry_point, local=False): - """Initialize our Plugin. - - :param str name: - Name of the entry-point as it was registered with setuptools. - :param entry_point: - EntryPoint returned by setuptools. - :type entry_point: - setuptools.EntryPoint - :param bool local: - Is this a repo-local plugin? - """ - self.name = name - self.entry_point = entry_point - self.local = local - self._plugin: Any = None - self._parameters = None - self._parameter_names: Optional[List[str]] = None - self._group = None - self._plugin_name = None - self._version = None - - def __repr__(self) -> str: - """Provide an easy to read description of the current plugin.""" - return 'Plugin(name="{}", entry_point="{}")'.format( - self.name, self.entry_point.value - ) - - def to_dictionary(self): - """Convert this plugin to a dictionary.""" - return { - "name": self.name, - "parameters": self.parameters, - "parameter_names": self.parameter_names, - "plugin": self.plugin, - "plugin_name": self.plugin_name, - } - - def is_in_a_group(self): - """Determine if this plugin is in a group. - - :returns: - True if the plugin is in a group, otherwise False. - :rtype: - bool - """ - return self.group() is not None - - def group(self): - """Find and parse the group the plugin is in.""" - if self._group is None: - name = self.name.split(".", 1) - if len(name) > 1: - self._group = name[0] - else: - self._group = NO_GROUP_FOUND - if self._group is NO_GROUP_FOUND: - return None - return self._group - - @property - def parameters(self): - """List of arguments that need to be passed to the plugin.""" - if self._parameters is None: - self._parameters = utils.parameters_for(self) - return self._parameters - - @property - def parameter_names(self) -> List[str]: - """List of argument names that need to be passed to the plugin.""" - if self._parameter_names is None: - self._parameter_names = list(self.parameters) - return self._parameter_names - - @property - def plugin(self): - """Load and return the plugin associated with the entry-point. - - This property implicitly loads the plugin and then caches it. - """ - self.load_plugin() - return self._plugin - - @property - def version(self) -> str: - """Return the version of the plugin.""" - version = self._version - if version is None: - if self.is_in_a_group(): - version = self._version = version_for(self) - else: - version = self._version = self.plugin.version - return version - - @property - def plugin_name(self): - """Return the name of the plugin.""" - if self._plugin_name is None: - if self.is_in_a_group(): - self._plugin_name = self.group() - else: - self._plugin_name = self.plugin.name - - return self._plugin_name - - @property - def off_by_default(self): - """Return whether the plugin is ignored by default.""" - return getattr(self.plugin, "off_by_default", False) - - def execute(self, *args, **kwargs): - r"""Call the plugin with \*args and \*\*kwargs.""" - return self.plugin(*args, **kwargs) # pylint: disable=not-callable - - def _load(self): - self._plugin = self.entry_point.load() - if not callable(self._plugin): - msg = ( - f"Plugin {self._plugin!r} is not a callable. It might be " - f"written for an older version of flake8 and might not work " - f"with this version" - ) - LOG.critical(msg) - raise TypeError(msg) - - def load_plugin(self): - """Retrieve the plugin for this entry-point. - - This loads the plugin, stores it on the instance and then returns it. - It does not reload it after the first time, it merely returns the - cached plugin. - - :returns: - Nothing - """ - if self._plugin is None: - LOG.info('Loading plugin "%s" from entry-point.', self.name) - try: - self._load() - except Exception as load_exception: - LOG.exception(load_exception) - failed_to_load = exceptions.FailedToLoadPlugin( - plugin_name=self.name, exception=load_exception - ) - LOG.critical(str(failed_to_load)) - raise failed_to_load - - def enable(self, optmanager, options=None): - """Remove plugin name from the default ignore list.""" - optmanager.remove_from_default_ignore([self.name]) - optmanager.extend_default_select([self.name]) - if not options: - return - try: - options.ignore.remove(self.name) - except (ValueError, KeyError): - LOG.debug( - "Attempted to remove %s from the ignore list but it was " - "not a member of the list.", - self.name, - ) - - def disable(self, optmanager): - """Add the plugin name to the default ignore list.""" - optmanager.extend_default_ignore([self.name]) - - def provide_options(self, optmanager, options, extra_args): - """Pass the parsed options and extra arguments to the plugin.""" - parse_options = getattr(self.plugin, "parse_options", None) - if parse_options is not None: - LOG.debug('Providing options to plugin "%s".', self.name) - try: - parse_options(optmanager, options, extra_args) - except TypeError: - parse_options(options) - - if self.name in options.enable_extensions: - self.enable(optmanager, options) - - def register_options(self, optmanager): - """Register the plugin's command-line options on the OptionManager. - - :param optmanager: - Instantiated OptionManager to register options on. - :type optmanager: - flake8.options.manager.OptionManager - :returns: - Nothing - """ - add_options = getattr(self.plugin, "add_options", None) - if add_options is not None: - LOG.debug( - 'Registering options from plugin "%s" on OptionManager %r', - self.name, - optmanager, - ) - with optmanager.group(self.plugin_name): - add_options(optmanager) - - if self.off_by_default: - self.disable(optmanager) - - -class PluginManager: # pylint: disable=too-few-public-methods - """Find and manage plugins consistently.""" - - def __init__( - self, namespace: str, local_plugins: Optional[List[str]] = None - ) -> None: - """Initialize the manager. - - :param str namespace: - Namespace of the plugins to manage, e.g., 'flake8.extension'. - :param list local_plugins: - Plugins from config (as "X = path.to:Plugin" strings). - """ - self.namespace = namespace - self.plugins: Dict[str, Plugin] = {} - self.names: List[str] = [] - self._load_local_plugins(local_plugins or []) - self._load_entrypoint_plugins() - - def _load_local_plugins(self, local_plugins): - """Load local plugins from config. - - :param list local_plugins: - Plugins from config (as "X = path.to:Plugin" strings). - """ - for plugin_str in local_plugins: - name, _, entry_str = plugin_str.partition("=") - name, entry_str = name.strip(), entry_str.strip() - entry_point = importlib_metadata.EntryPoint( - name, entry_str, self.namespace - ) - self._load_plugin_from_entrypoint(entry_point, local=True) - - def _load_entrypoint_plugins(self): - LOG.info('Loading entry-points for "%s".', self.namespace) - eps = importlib_metadata.entry_points().get(self.namespace, ()) - # python2.7 occasionally gives duplicate results due to redundant - # `local/lib` -> `../lib` symlink on linux in virtualenvs so we - # eliminate duplicates here - for entry_point in sorted(frozenset(eps)): - if entry_point.name == "per-file-ignores": - LOG.warning( - "flake8-per-file-ignores plugin is incompatible with " - "flake8>=3.7 (which implements per-file-ignores itself)." - ) - continue - elif entry_point.name == "flake8-colors": - LOG.warning( - "flake8-colors plugin is incompatible with " - "flake8>=4.1 (which implements colors itself)." - ) - continue - self._load_plugin_from_entrypoint(entry_point) - - def _load_plugin_from_entrypoint(self, entry_point, local=False): - """Load a plugin from a setuptools EntryPoint. - - :param EntryPoint entry_point: - EntryPoint to load plugin from. - :param bool local: - Is this a repo-local plugin? - """ - name = entry_point.name - self.plugins[name] = Plugin(name, entry_point, local=local) - self.names.append(name) - LOG.debug('Loaded %r for plugin "%s".', self.plugins[name], name) - - def map(self, func, *args, **kwargs): - r"""Call ``func`` with the plugin and \*args and \**kwargs after. - - This yields the return value from ``func`` for each plugin. - - :param collections.Callable func: - Function to call with each plugin. Signature should at least be: - - .. code-block:: python - - def myfunc(plugin): - pass - - Any extra positional or keyword arguments specified with map will - be passed along to this function after the plugin. The plugin - passed is a :class:`~flake8.plugins.manager.Plugin`. - :param args: - Positional arguments to pass to ``func`` after each plugin. - :param kwargs: - Keyword arguments to pass to ``func`` after each plugin. - """ - for name in self.names: - yield func(self.plugins[name], *args, **kwargs) - - def versions(self): - # () -> (str, str) - """Generate the versions of plugins. - - :returns: - Tuples of the plugin_name and version - :rtype: - tuple - """ - plugins_seen: Set[str] = set() - for entry_point_name in self.names: - plugin = self.plugins[entry_point_name] - plugin_name = plugin.plugin_name - if plugin.plugin_name in plugins_seen: - continue - plugins_seen.add(plugin_name) - yield (plugin_name, plugin.version) - - -def version_for(plugin): - # (Plugin) -> Optional[str] - """Determine the version of a plugin by its module. - - :param plugin: - The loaded plugin - :type plugin: - Plugin - :returns: - version string for the module - :rtype: - str - """ - module_name = plugin.plugin.__module__ - try: - module = __import__(module_name) - except ImportError: - return None - - return getattr(module, "__version__", None) - - -class PluginTypeManager: - """Parent class for most of the specific plugin types.""" - - namespace: str - - def __init__(self, local_plugins=None): - """Initialize the plugin type's manager. - - :param list local_plugins: - Plugins from config file instead of entry-points - """ - self.manager = PluginManager( - self.namespace, local_plugins=local_plugins - ) - self.plugins_loaded = False - - def __contains__(self, name): - """Check if the entry-point name is in this plugin type manager.""" - LOG.debug('Checking for "%s" in plugin type manager.', name) - return name in self.plugins - - def __getitem__(self, name): - """Retrieve a plugin by its name.""" - LOG.debug('Retrieving plugin for "%s".', name) - return self.plugins[name] - - def get(self, name, default=None): - """Retrieve the plugin referred to by ``name`` or return the default. - - :param str name: - Name of the plugin to retrieve. - :param default: - Default value to return. - :returns: - Plugin object referred to by name, if it exists. - :rtype: - :class:`Plugin` - """ - if name in self: - return self[name] - return default - - @property - def names(self): - """Proxy attribute to underlying manager.""" - return self.manager.names - - @property - def plugins(self): - """Proxy attribute to underlying manager.""" - return self.manager.plugins - - @staticmethod - def _generate_call_function(method_name, optmanager, *args, **kwargs): - def generated_function(plugin): - method = getattr(plugin, method_name, None) - if method is not None and callable(method): - return method(optmanager, *args, **kwargs) - - return generated_function - - def load_plugins(self): - """Load all plugins of this type that are managed by this manager.""" - if self.plugins_loaded: - return - - for plugin in self.plugins.values(): - plugin.load_plugin() - - # Do not set plugins_loaded if we run into an exception - self.plugins_loaded = True - - def register_options(self, optmanager): - """Register all of the checkers' options to the OptionManager.""" - self.load_plugins() - call_register_options = self._generate_call_function( - "register_options", optmanager - ) - - list(self.manager.map(call_register_options)) - - def provide_options(self, optmanager, options, extra_args): - """Provide parsed options and extra arguments to the plugins.""" - call_provide_options = self._generate_call_function( - "provide_options", optmanager, options, extra_args - ) - - list(self.manager.map(call_provide_options)) - - -class Checkers(PluginTypeManager): - """All of the checkers registered through entry-points or config.""" - - namespace = "flake8.extension" - - def checks_expecting(self, argument_name): - """Retrieve checks that expect an argument with the specified name. - - Find all checker plugins that are expecting a specific argument. - """ - for plugin in self.plugins.values(): - if argument_name == plugin.parameter_names[0]: - yield plugin - - def to_dictionary(self): - """Return a dictionary of AST and line-based plugins.""" - return { - "ast_plugins": [ - plugin.to_dictionary() for plugin in self.ast_plugins - ], - "logical_line_plugins": [ - plugin.to_dictionary() for plugin in self.logical_line_plugins - ], - "physical_line_plugins": [ - plugin.to_dictionary() for plugin in self.physical_line_plugins - ], - } - - def register_options(self, optmanager): - """Register all of the checkers' options to the OptionManager. - - This also ensures that plugins that are not part of a group and are - enabled by default are enabled on the option manager. - """ - # NOTE(sigmavirus24) We reproduce a little of - # PluginTypeManager.register_options to reduce the number of times - # that we loop over the list of plugins. Instead of looping twice, - # option registration and enabling the plugin, we loop once with one - # function to map over the plugins. - self.load_plugins() - call_register_options = self._generate_call_function( - "register_options", optmanager - ) - - def register_and_enable(plugin): - call_register_options(plugin) - if plugin.group() is None and not plugin.off_by_default: - plugin.enable(optmanager) - - list(self.manager.map(register_and_enable)) - - @property - def ast_plugins(self): - """List of plugins that expect the AST tree.""" - plugins = getattr(self, "_ast_plugins", []) - if not plugins: - plugins = list(self.checks_expecting("tree")) - self._ast_plugins = plugins - return plugins - - @property - def logical_line_plugins(self): - """List of plugins that expect the logical lines.""" - plugins = getattr(self, "_logical_line_plugins", []) - if not plugins: - plugins = list(self.checks_expecting("logical_line")) - self._logical_line_plugins = plugins - return plugins - - @property - def physical_line_plugins(self): - """List of plugins that expect the physical lines.""" - plugins = getattr(self, "_physical_line_plugins", []) - if not plugins: - plugins = list(self.checks_expecting("physical_line")) - self._physical_line_plugins = plugins - return plugins - - -class ReportFormatters(PluginTypeManager): - """All of the report formatters registered through entry-points/config.""" - - namespace = "flake8.report" diff --git a/src/flake8/plugins/pyflakes.py b/src/flake8/plugins/pyflakes.py index 7509438..ba3493a 100644 --- a/src/flake8/plugins/pyflakes.py +++ b/src/flake8/plugins/pyflakes.py @@ -69,8 +69,6 @@ FLAKE8_PYFLAKES_CODES = { class FlakesChecker(pyflakes.checker.Checker): """Subclass the Pyflakes checker to conform with the flake8 API.""" - name = "pyflakes" - version = pyflakes.__version__ with_doctest = False include_in_doctest: List[str] = [] exclude_from_doctest: List[str] = [] diff --git a/src/flake8/plugins/reporter.py b/src/flake8/plugins/reporter.py new file mode 100644 index 0000000..5bbbd81 --- /dev/null +++ b/src/flake8/plugins/reporter.py @@ -0,0 +1,41 @@ +"""Functions for construcing the requested report plugin.""" +import argparse +import logging +from typing import Dict + +from flake8.formatting.base import BaseFormatter +from flake8.plugins.finder import LoadedPlugin + +LOG = logging.getLogger(__name__) + + +def make( + reporters: Dict[str, LoadedPlugin], + options: argparse.Namespace, +) -> BaseFormatter: + """Make the formatter from the requested user options. + + - if :option:`flake8 --quiet` is specified, return the ``quiet-filename`` + formatter. + - if :option:`flake8 --quiet` is specified at least twice, return the + ``quiet-nothing`` formatter. + - otherwise attempt to return the formatter by name. + - failing that, assume it is a format string and return the ``default`` + formatter. + """ + format_name = options.format + if options.quiet == 1: + format_name = "quiet-filename" + elif options.quiet >= 2: + format_name = "quiet-nothing" + + try: + format_plugin = reporters[format_name] + except KeyError: + LOG.warning( + "%r is an unknown formatter. Falling back to default.", + format_name, + ) + format_plugin = reporters["default"] + + return format_plugin.obj(options) diff --git a/src/flake8/processor.py b/src/flake8/processor.py index 2649c9d..d0652d8 100644 --- a/src/flake8/processor.py +++ b/src/flake8/processor.py @@ -14,6 +14,7 @@ from typing import Tuple import flake8 from flake8 import defaults from flake8 import utils +from flake8.plugins.finder import LoadedPlugin LOG = logging.getLogger(__name__) NEWLINE = frozenset([tokenize.NL, tokenize.NEWLINE]) @@ -160,11 +161,11 @@ class FileProcessor: if self.blank_before < self.blank_lines: self.blank_before = self.blank_lines - def update_checker_state_for(self, plugin: Dict[str, Any]) -> None: + def update_checker_state_for(self, plugin: LoadedPlugin) -> None: """Update the checker_state attribute for the plugin.""" - if "checker_state" in plugin["parameters"]: + if "checker_state" in plugin.parameters: self.checker_state = self._checker_states.setdefault( - plugin["name"], {} + plugin.entry_name, {} ) def next_logical_line(self) -> None: diff --git a/src/flake8/utils.py b/src/flake8/utils.py index 9eb2497..a5a1901 100644 --- a/src/flake8/utils.py +++ b/src/flake8/utils.py @@ -2,7 +2,6 @@ import collections import fnmatch as _fnmatch import functools -import inspect import io import logging import os @@ -18,14 +17,10 @@ from typing import Pattern from typing import Sequence from typing import Set from typing import Tuple -from typing import TYPE_CHECKING from typing import Union from flake8 import exceptions -if TYPE_CHECKING: - from flake8.plugins.manager import Plugin - DIFF_HUNK_REGEXP = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$") COMMA_SEPARATED_LIST_RE = re.compile(r"[,\s]") LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]") @@ -310,40 +305,6 @@ def fnmatch(filename: str, patterns: Sequence[str]) -> bool: return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns) -def parameters_for(plugin: "Plugin") -> Dict[str, bool]: - """Return the parameters for the plugin. - - This will inspect the plugin and return either the function parameters - if the plugin is a function or the parameters for ``__init__`` after - ``self`` if the plugin is a class. - - :param plugin: - The internal plugin object. - :type plugin: - flake8.plugins.manager.Plugin - :returns: - A dictionary mapping the parameter name to whether or not it is - required (a.k.a., is positional only/does not have a default). - :rtype: - dict([(str, bool)]) - """ - func = plugin.plugin - is_class = not inspect.isfunction(func) - if is_class: # The plugin is a class - func = plugin.plugin.__init__ - - parameters = { - parameter.name: parameter.default is parameter.empty - for parameter in inspect.signature(func).parameters.values() - if parameter.kind == parameter.POSITIONAL_OR_KEYWORD - } - - if is_class: - parameters.pop("self", None) - - return parameters - - def matches_filename( path: str, patterns: Sequence[str], diff --git a/tests/fixtures/config_files/README.rst b/tests/fixtures/config_files/README.rst deleted file mode 100644 index 8796051..0000000 --- a/tests/fixtures/config_files/README.rst +++ /dev/null @@ -1,22 +0,0 @@ -About this directory -==================== - -The files in this directory are test fixtures for unit and integration tests. -Their purpose is described below. Please note the list of file names that can -not be created as they are already used by tests. - -New fixtures are preferred over updating existing features unless existing -tests will fail. - -Files that should not be created --------------------------------- - -- ``tests/fixtures/config_files/missing.ini`` - -Purposes of existing fixtures ------------------------------ - -``tests/fixtures/config_files/local-plugin.ini`` - - This is for testing configuring a plugin via flake8 config file instead of - setuptools entry-point. diff --git a/tests/fixtures/config_files/local-plugin-path.ini b/tests/fixtures/config_files/local-plugin-path.ini deleted file mode 100644 index 7368c1e..0000000 --- a/tests/fixtures/config_files/local-plugin-path.ini +++ /dev/null @@ -1,5 +0,0 @@ -[flake8:local-plugins] -extension = - XE = aplugin:ExtensionTestPlugin2 -paths = - ../../integration/subdir/ diff --git a/tests/fixtures/config_files/local-plugin.ini b/tests/fixtures/config_files/local-plugin.ini deleted file mode 100644 index 8344f76..0000000 --- a/tests/fixtures/config_files/local-plugin.ini +++ /dev/null @@ -1,5 +0,0 @@ -[flake8:local-plugins] -extension = - XE = tests.integration.test_plugins:ExtensionTestPlugin -report = - XR = tests.integration.test_plugins:ReportTestPlugin diff --git a/tests/integration/subdir/aplugin.py b/tests/integration/subdir/aplugin.py index fde5890..801f2c0 100644 --- a/tests/integration/subdir/aplugin.py +++ b/tests/integration/subdir/aplugin.py @@ -4,9 +4,6 @@ class ExtensionTestPlugin2: """Extension test plugin in its own directory.""" - name = "ExtensionTestPlugin2" - version = "1.0.0" - def __init__(self, tree): """Construct an instance of test plugin.""" diff --git a/tests/integration/test_checker.py b/tests/integration/test_checker.py index 29db6d3..96b7d4b 100644 --- a/tests/integration/test_checker.py +++ b/tests/integration/test_checker.py @@ -6,28 +6,19 @@ import pytest from flake8 import checker from flake8._compat import importlib_metadata -from flake8.plugins import manager +from flake8.plugins import finder from flake8.processor import FileProcessor PHYSICAL_LINE = "# Physical line content" EXPECTED_REPORT = (1, 1, "T000 Expected Message") EXPECTED_REPORT_PHYSICAL_LINE = (1, "T000 Expected Message") -EXPECTED_RESULT_PHYSICAL_LINE = ( - "T000", - 0, - 1, - "Expected Message", - None, -) +EXPECTED_RESULT_PHYSICAL_LINE = ("T000", 0, 1, "Expected Message", None) class PluginClass: """Simple file plugin class yielding the expected report.""" - name = "test" - version = "1.0.0" - def __init__(self, tree): """Construct a dummy object to provide mandatory parameter.""" pass @@ -37,87 +28,78 @@ class PluginClass: yield EXPECTED_REPORT + (type(self),) -def plugin_func(func): - """Decorate file plugins which are implemented as functions.""" - func.name = "test" - func.version = "1.0.0" - return func - - -@plugin_func def plugin_func_gen(tree): """Yield the expected report.""" yield EXPECTED_REPORT + (type(plugin_func_gen),) -@plugin_func def plugin_func_list(tree): """Return a list of expected reports.""" return [EXPECTED_REPORT + (type(plugin_func_list),)] -@plugin_func def plugin_func_physical_ret(physical_line): """Expect report from a physical_line. Single return.""" return EXPECTED_REPORT_PHYSICAL_LINE -@plugin_func def plugin_func_physical_none(physical_line): """Expect report from a physical_line. No results.""" return None -@plugin_func def plugin_func_physical_list_single(physical_line): """Expect report from a physical_line. List of single result.""" return [EXPECTED_REPORT_PHYSICAL_LINE] -@plugin_func def plugin_func_physical_list_multiple(physical_line): """Expect report from a physical_line. List of multiple results.""" return [EXPECTED_REPORT_PHYSICAL_LINE] * 2 -@plugin_func def plugin_func_physical_gen_single(physical_line): """Expect report from a physical_line. Generator of single result.""" yield EXPECTED_REPORT_PHYSICAL_LINE -@plugin_func def plugin_func_physical_gen_multiple(physical_line): """Expect report from a physical_line. Generator of multiple results.""" for _ in range(3): yield EXPECTED_REPORT_PHYSICAL_LINE +def plugin_func_out_of_bounds(logical_line): + """This produces an error out of bounds.""" + yield 10000, "L100 test" + + def mock_file_checker_with_plugin(plugin_target): """Get a mock FileChecker class with plugin_target registered. Useful as a starting point for mocking reports/results. """ - # Mock an entry point returning the plugin target - entry_point = mock.Mock(spec=["load"]) - entry_point.name = plugin_target.name - entry_point.load.return_value = plugin_target - entry_point.value = "mocked:value" - - # Load the checker plugins using the entry point mock - with mock.patch.object( - importlib_metadata, - "entry_points", - return_value={"flake8.extension": [entry_point]}, - ): - checks = manager.Checkers() + to_load = [ + finder.Plugin( + "flake-package", + "9001", + importlib_metadata.EntryPoint( + "Q", + f"{plugin_target.__module__}:{plugin_target.__name__}", + "flake8.extension", + ), + ), + ] + plugins = finder.load_plugins(to_load, []) # Prevent it from reading lines from stdin or somewhere else with mock.patch( "flake8.processor.FileProcessor.read_lines", return_value=["Line 1"] ): file_checker = checker.FileChecker( - "-", checks.to_dictionary(), mock.MagicMock() + filename="-", + plugins=plugins.checkers, + options=mock.MagicMock(), ) return file_checker @@ -173,11 +155,7 @@ def test_line_check_results(plugin_target, len_results): def test_logical_line_offset_out_of_bounds(): """Ensure that logical line offsets that are out of bounds do not crash.""" - @plugin_func - def _logical_line_out_of_bounds(logical_line): - yield 10000, "L100 test" - - file_checker = mock_file_checker_with_plugin(_logical_line_out_of_bounds) + file_checker = mock_file_checker_with_plugin(plugin_func_out_of_bounds) logical_ret = ( "", @@ -293,7 +271,7 @@ def test_report_order(results, expected_order): # Create a placeholder manager without arguments or plugins # Just add one custom file checker which just provides the results - manager = checker.Manager(style_guide, []) + manager = checker.Manager(style_guide, finder.Checkers([], [], [])) manager.checkers = manager._all_checkers = [file_checker] # _handle_results is the first place which gets the sorted result @@ -357,6 +335,10 @@ def test_handling_syntaxerrors_across_pythons(): "invalid syntax", ("", 2, 1, "bad python:\n", 2, 11) ) expected = (2, 1) - file_checker = checker.FileChecker("-", {}, mock.MagicMock()) + file_checker = checker.FileChecker( + filename="-", + plugins=finder.Checkers([], [], []), + options=mock.MagicMock(), + ) actual = file_checker._extract_syntax_information(err) assert actual == expected diff --git a/tests/integration/test_plugins.py b/tests/integration/test_plugins.py index 7d28aff..682360c 100644 --- a/tests/integration/test_plugins.py +++ b/tests/integration/test_plugins.py @@ -1,17 +1,18 @@ """Integration tests for plugin loading.""" -from flake8.main import application -from flake8.main.cli import main +import pytest -LOCAL_PLUGIN_CONFIG = "tests/fixtures/config_files/local-plugin.ini" -LOCAL_PLUGIN_PATH_CONFIG = "tests/fixtures/config_files/local-plugin-path.ini" +from flake8.main.cli import main +from flake8.main.options import register_default_options +from flake8.main.options import stage1_arg_parser +from flake8.options import aggregator +from flake8.options import config +from flake8.options.manager import OptionManager +from flake8.plugins import finder class ExtensionTestPlugin: """Extension test plugin.""" - name = "ExtensionTestPlugin" - version = "1.0.0" - def __init__(self, tree): """Construct an instance of test plugin.""" @@ -27,9 +28,6 @@ class ExtensionTestPlugin: class ReportTestPlugin: """Report test plugin.""" - name = "ReportTestPlugin" - version = "1.0.0" - def __init__(self, tree): """Construct an instance of test plugin.""" @@ -37,41 +35,69 @@ class ReportTestPlugin: """Do nothing.""" -def test_enable_local_plugin_from_config(): +@pytest.fixture +def local_config(tmp_path): + cfg_s = f"""\ +[flake8:local-plugins] +extension = + XE = {ExtensionTestPlugin.__module__}:{ExtensionTestPlugin.__name__} +report = + XR = {ReportTestPlugin.__module__}:{ReportTestPlugin.__name__} +""" + cfg = tmp_path.joinpath("tox.ini") + cfg.write_text(cfg_s) + + return str(cfg) + + +def test_enable_local_plugin_from_config(local_config): """App can load a local plugin from config file.""" - app = application.Application() - app.initialize(["flake8", "--config", LOCAL_PLUGIN_CONFIG]) + cfg, cfg_dir = config.load_config(local_config, [], isolated=False) + plugins = finder.find_plugins(cfg) + plugin_paths = finder.find_local_plugin_paths(cfg, cfg_dir) + loaded_plugins = finder.load_plugins(plugins, plugin_paths) - assert app.check_plugins is not None - assert app.check_plugins["XE"].plugin is ExtensionTestPlugin - assert app.formatting_plugins is not None - assert app.formatting_plugins["XR"].plugin is ReportTestPlugin + (custom_extension,) = ( + loaded + for loaded in loaded_plugins.checkers.tree + if loaded.entry_name == "XE" + ) + custom_report = loaded_plugins.reporters["XR"] + + assert custom_extension.obj is ExtensionTestPlugin + assert custom_report.obj is ReportTestPlugin -def test_local_plugin_can_add_option(): +def test_local_plugin_can_add_option(local_config): """A local plugin can add a CLI option.""" - app = application.Application() - app.initialize( - ["flake8", "--config", LOCAL_PLUGIN_CONFIG, "--anopt", "foo"] + + argv = ["--config", local_config, "--anopt", "foo"] + + stage1_parser = stage1_arg_parser() + stage1_args, rest = stage1_parser.parse_known_args(argv) + + cfg, cfg_dir = config.load_config( + config=stage1_args.config, extra=[], isolated=False ) - assert app.options is not None - assert app.options.anopt == "foo" + plugins = finder.find_plugins(cfg) + plugin_paths = finder.find_local_plugin_paths(cfg, cfg_dir) + loaded_plugins = finder.load_plugins(plugins, plugin_paths) + option_manager = OptionManager( + version="123", + plugin_versions="", + parents=[stage1_parser], + ) + register_default_options(option_manager) + option_manager.register_plugins(loaded_plugins) -def test_enable_local_plugin_at_non_installed_path(): - """Can add a paths option in local-plugins config section for finding.""" - app = application.Application() - app.initialize(["flake8", "--config", LOCAL_PLUGIN_PATH_CONFIG]) + args = aggregator.aggregate_options(option_manager, cfg, cfg_dir, argv) - assert app.check_plugins is not None - assert app.check_plugins["XE"].plugin.name == "ExtensionTestPlugin2" + assert args.anopt == "foo" class AlwaysErrors: - name = "AlwaysError" - version = "1" - def __init__(self, tree): pass diff --git a/tests/unit/plugins/__init__.py b/tests/unit/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/plugins/finder_test.py b/tests/unit/plugins/finder_test.py new file mode 100644 index 0000000..993c13c --- /dev/null +++ b/tests/unit/plugins/finder_test.py @@ -0,0 +1,610 @@ +import configparser +import sys +from unittest import mock + +import pytest + +from flake8._compat import importlib_metadata +from flake8.exceptions import FailedToLoadPlugin +from flake8.plugins import finder +from flake8.plugins.pyflakes import FlakesChecker + + +def _ep(name="X", value="dne:dne", group="flake8.extension"): + return importlib_metadata.EntryPoint(name, value, group) + + +def _plugin(package="local", version="local", ep=None): + if ep is None: + ep = _ep() + return finder.Plugin(package, version, ep) + + +def _loaded(plugin=None, obj=None, parameters=None): + if plugin is None: + plugin = _plugin() + if parameters is None: + parameters = {"tree": True} + return finder.LoadedPlugin(plugin, obj, parameters) + + +def test_loaded_plugin_entry_name_vs_display_name(): + loaded = _loaded(_plugin(package="package-name", ep=_ep(name="Q"))) + assert loaded.entry_name == "Q" + assert loaded.display_name == "package-name[Q]" + + +def test_plugins_all_plugins(): + tree_plugin = _loaded(parameters={"tree": True}) + logical_line_plugin = _loaded(parameters={"logical_line": True}) + physical_line_plugin = _loaded(parameters={"physical_line": True}) + report_plugin = _loaded( + plugin=_plugin(ep=_ep(name="R", group="flake8.report")) + ) + + plugins = finder.Plugins( + checkers=finder.Checkers( + tree=[tree_plugin], + logical_line=[logical_line_plugin], + physical_line=[physical_line_plugin], + ), + reporters={"R": report_plugin}, + ) + + assert tuple(plugins.all_plugins()) == ( + tree_plugin, + logical_line_plugin, + physical_line_plugin, + report_plugin, + ) + + +def test_plugins_versions_str(): + plugins = finder.Plugins( + checkers=finder.Checkers( + tree=[_loaded(_plugin(package="pkg1", version="1"))], + logical_line=[_loaded(_plugin(package="pkg2", version="2"))], + physical_line=[_loaded(_plugin(package="pkg1", version="1"))], + ), + reporters={ + # ignore flake8 builtin plugins + "default": _loaded(_plugin(package="flake8")), + # ignore local plugins + "custom": _loaded(_plugin(package="local")), + }, + ) + assert plugins.versions_str() == "pkg1: 1, pkg2: 2" + + +@pytest.fixture +def pyflakes_dist(tmp_path): + metadata = """\ +Metadata-Version: 2.1 +Name: pyflakes +Version: 9000.1.0 +""" + d = tmp_path.joinpath("pyflakes.dist-info") + d.mkdir() + d.joinpath("METADATA").write_text(metadata) + return importlib_metadata.PathDistribution(d) + + +@pytest.fixture +def pycodestyle_dist(tmp_path): + metadata = """\ +Metadata-Version: 2.1 +Name: pycodestyle +Version: 9000.2.0 +""" + d = tmp_path.joinpath("pycodestyle.dist-info") + d.mkdir() + d.joinpath("METADATA").write_text(metadata) + return importlib_metadata.PathDistribution(d) + + +@pytest.fixture +def flake8_dist(tmp_path): + metadata = """\ +Metadata-Version: 2.1 +Name: flake8 +Version: 9001 +""" + entry_points = """\ +[console_scripts] +flake8 = flake8.main.cli:main + +[flake8.extension] +F = flake8.plugins.pyflakes:FlakesChecker +pycodestyle.bare_except = pycodestyle:bare_except +pycodestyle.blank_lines = pycodestyle:blank_lines + +[flake8.report] +default = flake8.formatting.default:Default +pylint = flake8.formatting.default:Pylint +""" + d = tmp_path.joinpath("flake8.dist-info") + d.mkdir() + d.joinpath("METADATA").write_text(metadata) + d.joinpath("entry_points.txt").write_text(entry_points) + return importlib_metadata.PathDistribution(d) + + +@pytest.fixture +def flake8_foo_dist(tmp_path): + metadata = """\ +Metadata-Version: 2.1 +Name: flake8-foo +Version: 1.2.3 +""" + eps = """\ +[console_scripts] +foo = flake8_foo:main +[flake8.extension] +Q = flake8_foo:Plugin +[flake8.report] +foo = flake8_foo:Formatter +""" + d = tmp_path.joinpath("flake8_foo.dist-info") + d.mkdir() + d.joinpath("METADATA").write_text(metadata) + d.joinpath("entry_points.txt").write_text(eps) + return importlib_metadata.PathDistribution(d) + + +@pytest.fixture +def mock_distribution(pyflakes_dist, pycodestyle_dist): + dists = {"pyflakes": pyflakes_dist, "pycodestyle": pycodestyle_dist} + with mock.patch.object(importlib_metadata, "distribution", dists.get): + yield + + +def test_flake8_plugins(flake8_dist, mock_distribution): + """Ensure entrypoints for flake8 are parsed specially.""" + + eps = flake8_dist.entry_points + ret = set(finder._flake8_plugins(eps, "flake8", "9001")) + assert ret == { + finder.Plugin( + "pyflakes", + "9000.1.0", + importlib_metadata.EntryPoint( + "F", + "flake8.plugins.pyflakes:FlakesChecker", + "flake8.extension", + ), + ), + finder.Plugin( + "pycodestyle", + "9000.2.0", + importlib_metadata.EntryPoint( + "pycodestyle.bare_except", + "pycodestyle:bare_except", + "flake8.extension", + ), + ), + finder.Plugin( + "pycodestyle", + "9000.2.0", + importlib_metadata.EntryPoint( + "pycodestyle.blank_lines", + "pycodestyle:blank_lines", + "flake8.extension", + ), + ), + finder.Plugin( + "flake8", + "9001", + importlib_metadata.EntryPoint( + "default", "flake8.formatting.default:Default", "flake8.report" + ), + ), + finder.Plugin( + "flake8", + "9001", + importlib_metadata.EntryPoint( + "pylint", "flake8.formatting.default:Pylint", "flake8.report" + ), + ), + } + + +def test_importlib_plugins( + tmp_path, + flake8_dist, + flake8_foo_dist, + mock_distribution, + caplog, +): + """Ensure we can load plugins from importlib_metadata.""" + + # make sure flake8-colors is skipped + flake8_colors_metadata = """\ +Metadata-Version: 2.1 +Name: flake8-colors +Version: 1.2.3 +""" + flake8_colors_eps = """\ +[flake8.extension] +flake8-colors = flake8_colors:ColorFormatter +""" + flake8_colors_d = tmp_path.joinpath("flake8_colors.dist-info") + flake8_colors_d.mkdir() + flake8_colors_d.joinpath("METADATA").write_text(flake8_colors_metadata) + flake8_colors_d.joinpath("entry_points.txt").write_text(flake8_colors_eps) + flake8_colors_dist = importlib_metadata.PathDistribution(flake8_colors_d) + + unrelated_metadata = """\ +Metadata-Version: 2.1 +Name: unrelated +Version: 4.5.6 +""" + unrelated_eps = """\ +[console_scripts] +unrelated = unrelated:main +""" + unrelated_d = tmp_path.joinpath("unrelated.dist-info") + unrelated_d.mkdir() + unrelated_d.joinpath("METADATA").write_text(unrelated_metadata) + unrelated_d.joinpath("entry_points.txt").write_text(unrelated_eps) + unrelated_dist = importlib_metadata.PathDistribution(unrelated_d) + + with mock.patch.object( + importlib_metadata, + "distributions", + return_value=[ + flake8_dist, + flake8_colors_dist, + flake8_foo_dist, + unrelated_dist, + ], + ): + ret = set(finder._find_importlib_plugins()) + + assert ret == { + finder.Plugin( + "flake8-foo", + "1.2.3", + importlib_metadata.EntryPoint( + "Q", "flake8_foo:Plugin", "flake8.extension" + ), + ), + finder.Plugin( + "pycodestyle", + "9000.2.0", + importlib_metadata.EntryPoint( + "pycodestyle.bare_except", + "pycodestyle:bare_except", + "flake8.extension", + ), + ), + finder.Plugin( + "pycodestyle", + "9000.2.0", + importlib_metadata.EntryPoint( + "pycodestyle.blank_lines", + "pycodestyle:blank_lines", + "flake8.extension", + ), + ), + finder.Plugin( + "pyflakes", + "9000.1.0", + importlib_metadata.EntryPoint( + "F", + "flake8.plugins.pyflakes:FlakesChecker", + "flake8.extension", + ), + ), + finder.Plugin( + "flake8", + "9001", + importlib_metadata.EntryPoint( + "default", "flake8.formatting.default:Default", "flake8.report" + ), + ), + finder.Plugin( + "flake8", + "9001", + importlib_metadata.EntryPoint( + "pylint", "flake8.formatting.default:Pylint", "flake8.report" + ), + ), + finder.Plugin( + "flake8-foo", + "1.2.3", + importlib_metadata.EntryPoint( + "foo", "flake8_foo:Formatter", "flake8.report" + ), + ), + } + + assert caplog.record_tuples == [ + ( + "flake8.plugins.finder", + 30, + "flake8-colors plugin is obsolete in flake8>=4.1", + ), + ] + + +def test_find_local_plugins_nothing(): + cfg = configparser.RawConfigParser() + assert set(finder._find_local_plugins(cfg)) == set() + + +@pytest.fixture +def local_plugin_cfg(): + cfg = configparser.RawConfigParser() + cfg.add_section("flake8:local-plugins") + cfg.set("flake8:local-plugins", "extension", "Y=mod2:attr, X = mod:attr") + cfg.set("flake8:local-plugins", "report", "Z=mod3:attr") + return cfg + + +def test_find_local_plugins(local_plugin_cfg): + ret = set(finder._find_local_plugins(local_plugin_cfg)) + assert ret == { + finder.Plugin( + "local", + "local", + importlib_metadata.EntryPoint( + "X", + "mod:attr", + "flake8.extension", + ), + ), + finder.Plugin( + "local", + "local", + importlib_metadata.EntryPoint( + "Y", + "mod2:attr", + "flake8.extension", + ), + ), + finder.Plugin( + "local", + "local", + importlib_metadata.EntryPoint( + "Z", + "mod3:attr", + "flake8.report", + ), + ), + } + + +def test_find_plugins( + tmp_path, + flake8_dist, + flake8_foo_dist, + mock_distribution, + local_plugin_cfg, +): + with mock.patch.object( + importlib_metadata, + "distributions", + return_value=[flake8_dist, flake8_foo_dist], + ): + ret = finder.find_plugins(local_plugin_cfg) + + assert ret == [ + finder.Plugin( + "flake8", + "9001", + importlib_metadata.EntryPoint( + "default", "flake8.formatting.default:Default", "flake8.report" + ), + ), + finder.Plugin( + "flake8", + "9001", + importlib_metadata.EntryPoint( + "pylint", "flake8.formatting.default:Pylint", "flake8.report" + ), + ), + finder.Plugin( + "flake8-foo", + "1.2.3", + importlib_metadata.EntryPoint( + "Q", "flake8_foo:Plugin", "flake8.extension" + ), + ), + finder.Plugin( + "flake8-foo", + "1.2.3", + importlib_metadata.EntryPoint( + "foo", "flake8_foo:Formatter", "flake8.report" + ), + ), + finder.Plugin( + "local", + "local", + importlib_metadata.EntryPoint("X", "mod:attr", "flake8.extension"), + ), + finder.Plugin( + "local", + "local", + importlib_metadata.EntryPoint( + "Y", "mod2:attr", "flake8.extension" + ), + ), + finder.Plugin( + "local", + "local", + importlib_metadata.EntryPoint("Z", "mod3:attr", "flake8.report"), + ), + finder.Plugin( + "pycodestyle", + "9000.2.0", + importlib_metadata.EntryPoint( + "pycodestyle.bare_except", + "pycodestyle:bare_except", + "flake8.extension", + ), + ), + finder.Plugin( + "pycodestyle", + "9000.2.0", + importlib_metadata.EntryPoint( + "pycodestyle.blank_lines", + "pycodestyle:blank_lines", + "flake8.extension", + ), + ), + finder.Plugin( + "pyflakes", + "9000.1.0", + importlib_metadata.EntryPoint( + "F", + "flake8.plugins.pyflakes:FlakesChecker", + "flake8.extension", + ), + ), + ] + + +def test_find_local_plugin_paths_missing(tmp_path): + cfg = configparser.RawConfigParser() + assert finder.find_local_plugin_paths(cfg, str(tmp_path)) == [] + + +def test_find_local_plugin_paths(tmp_path): + cfg = configparser.RawConfigParser() + cfg.add_section("flake8:local-plugins") + cfg.set("flake8:local-plugins", "paths", "./a, ./b") + ret = finder.find_local_plugin_paths(cfg, str(tmp_path)) + + assert ret == [str(tmp_path.joinpath("a")), str(tmp_path.joinpath("b"))] + + +def test_parameters_for_class_plugin(): + """Verify that we can retrieve the parameters for a class plugin.""" + + class FakeCheck: + def __init__(self, tree): + raise NotImplementedError + + assert finder._parameters_for(FakeCheck) == {"tree": True} + + +def test_parameters_for_function_plugin(): + """Verify that we retrieve the parameters for a function plugin.""" + + def fake_plugin(physical_line, self, tree, optional=None): + raise NotImplementedError + + assert finder._parameters_for(fake_plugin) == { + "physical_line": True, + "self": True, + "tree": True, + "optional": False, + } + + +def test_load_plugin_import_error(): + plugin = _plugin(ep=_ep(value="dne:dne")) + + with pytest.raises(FailedToLoadPlugin) as excinfo: + finder._load_plugin(plugin) + + pkg, e = excinfo.value.args + assert pkg == "local" + assert isinstance(e, ModuleNotFoundError) + + +def test_load_plugin_not_callable(): + plugin = _plugin(ep=_ep(value="os:curdir")) + + with pytest.raises(FailedToLoadPlugin) as excinfo: + finder._load_plugin(plugin) + + pkg, e = excinfo.value.args + assert pkg == "local" + assert isinstance(e, TypeError) + assert e.args == ("expected loaded plugin to be callable",) + + +def test_load_plugin_ok(): + plugin = _plugin(ep=_ep(value="flake8.plugins.pyflakes:FlakesChecker")) + + loaded = finder._load_plugin(plugin) + + assert loaded == finder.LoadedPlugin( + plugin, + FlakesChecker, + {"tree": True, "file_tokens": True, "filename": True}, + ) + + +@pytest.fixture +def reset_sys(): + orig_path = sys.path[:] + orig_modules = sys.modules.copy() + yield + sys.path[:] = orig_path + sys.modules.clear() + sys.modules.update(orig_modules) + + +@pytest.mark.usefixtures("reset_sys") +def test_import_plugins_extends_sys_path(): + plugin = _plugin(ep=_ep(value="aplugin:ExtensionTestPlugin2")) + + ret = finder._import_plugins([plugin], ["tests/integration/subdir"]) + + import aplugin + + assert ret == [ + finder.LoadedPlugin( + plugin, + aplugin.ExtensionTestPlugin2, + {"tree": True}, + ), + ] + + +def test_classify_plugins(): + report_plugin = _loaded( + plugin=_plugin(ep=_ep(name="R", group="flake8.report")) + ) + tree_plugin = _loaded(parameters={"tree": True}) + logical_line_plugin = _loaded(parameters={"logical_line": True}) + physical_line_plugin = _loaded(parameters={"physical_line": True}) + + classified = finder._classify_plugins( + [report_plugin, tree_plugin, logical_line_plugin, physical_line_plugin] + ) + + assert classified == finder.Plugins( + checkers=finder.Checkers( + tree=[tree_plugin], + logical_line=[logical_line_plugin], + physical_line=[physical_line_plugin], + ), + reporters={"R": report_plugin}, + ) + + +@pytest.mark.usefixtures("reset_sys") +def test_load_plugins(): + plugin = _plugin(ep=_ep(value="aplugin:ExtensionTestPlugin2")) + + ret = finder.load_plugins([plugin], ["tests/integration/subdir"]) + + import aplugin + + assert ret == finder.Plugins( + checkers=finder.Checkers( + tree=[ + finder.LoadedPlugin( + plugin, + aplugin.ExtensionTestPlugin2, + {"tree": True}, + ), + ], + logical_line=[], + physical_line=[], + ), + reporters={}, + ) diff --git a/tests/unit/plugins/reporter_test.py b/tests/unit/plugins/reporter_test.py new file mode 100644 index 0000000..4b46cc4 --- /dev/null +++ b/tests/unit/plugins/reporter_test.py @@ -0,0 +1,74 @@ +import argparse + +import pytest + +from flake8._compat import importlib_metadata +from flake8.formatting import default +from flake8.plugins import finder +from flake8.plugins import reporter + + +def _opts(**kwargs): + kwargs.setdefault("quiet", 0), + kwargs.setdefault("color", "never") + kwargs.setdefault("output_file", None) + return argparse.Namespace(**kwargs) + + +@pytest.fixture +def reporters(): + def _plugin(name, cls): + return finder.LoadedPlugin( + finder.Plugin( + "flake8", + "123", + importlib_metadata.EntryPoint( + name, f"{cls.__module__}:{cls.__name__}", "flake8.report" + ), + ), + cls, + {"options": True}, + ) + + return { + "default": _plugin("default", default.Default), + "pylint": _plugin("pylint", default.Pylint), + "quiet-filename": _plugin("quiet-filename", default.FilenameOnly), + "quiet-nothing": _plugin("quiet-nothing", default.Nothing), + } + + +def test_make_formatter_default(reporters): + ret = reporter.make(reporters, _opts(format="default")) + assert isinstance(ret, default.Default) + assert ret.error_format == default.Default.error_format + + +def test_make_formatter_quiet_filename(reporters): + ret = reporter.make(reporters, _opts(format="default", quiet=1)) + assert isinstance(ret, default.FilenameOnly) + + +@pytest.mark.parametrize("quiet", (2, 3)) +def test_make_formatter_very_quiet(reporters, quiet): + ret = reporter.make(reporters, _opts(format="default", quiet=quiet)) + assert isinstance(ret, default.Nothing) + + +def test_make_formatter_custom(reporters): + ret = reporter.make(reporters, _opts(format="pylint")) + assert isinstance(ret, default.Pylint) + + +def test_make_formatter_format_string(reporters, caplog): + ret = reporter.make(reporters, _opts(format="hi %(code)s")) + assert isinstance(ret, default.Default) + assert ret.error_format == "hi %(code)s" + + assert caplog.record_tuples == [ + ( + "flake8.plugins.reporter", + 30, + "'hi %(code)s' is an unknown formatter. Falling back to default.", + ) + ] diff --git a/tests/unit/test_application.py b/tests/unit/test_application.py index 9f4b1e8..508f83b 100644 --- a/tests/unit/test_application.py +++ b/tests/unit/test_application.py @@ -1,7 +1,5 @@ """Tests for the Application class.""" import argparse -import sys -from unittest import mock import pytest @@ -44,62 +42,3 @@ def test_application_exit_code( application.options = options(exit_zero=exit_zero) assert application.exit_code() == value - - -def test_warns_on_unknown_formatter_plugin_name(application): - """Verify we log a warning with an unfound plugin.""" - default = mock.Mock() - execute = default.execute - application.formatting_plugins = { - "default": default, - } - with mock.patch.object(app.LOG, "warning") as warning: - assert execute is application.formatter_for("fake-plugin-name") - - assert warning.called is True - assert warning.call_count == 1 - - -def test_returns_specified_plugin(application): - """Verify we get the plugin we want.""" - desired = mock.Mock() - execute = desired.execute - application.formatting_plugins = { - "default": mock.Mock(), - "desired": desired, - } - - with mock.patch.object(app.LOG, "warning") as warning: - assert execute is application.formatter_for("desired") - - assert warning.called is False - - -def test_prelim_opts_args(application): - """Verify we get sensible prelim opts and args.""" - opts, args = application.parse_preliminary_options( - ["--foo", "--verbose", "src", "setup.py", "--statistics", "--version"] - ) - - assert opts.verbose - assert args == ["--foo", "src", "setup.py", "--statistics", "--version"] - - -def test_prelim_opts_ignore_help(application): - """Verify -h/--help is not handled.""" - # GIVEN - - # WHEN - _, args = application.parse_preliminary_options(["--help", "-h"]) - - # THEN - assert args == ["--help", "-h"] - - -def test_prelim_opts_handles_empty(application): - """Verify empty argv lists are handled correctly.""" - irrelevant_args = ["myexe", "/path/to/foo"] - with mock.patch.object(sys, "argv", irrelevant_args): - opts, args = application.parse_preliminary_options([]) - - assert args == [] diff --git a/tests/unit/test_checker_manager.py b/tests/unit/test_checker_manager.py index 09a0f65..c6114f6 100644 --- a/tests/unit/test_checker_manager.py +++ b/tests/unit/test_checker_manager.py @@ -7,6 +7,7 @@ import pytest from flake8 import checker from flake8.main.options import JobsArgument +from flake8.plugins import finder def style_guide_mock(): @@ -22,7 +23,7 @@ def style_guide_mock(): def _parallel_checker_manager(): """Call Manager.run() and return the number of calls to `run_serial`.""" style_guide = style_guide_mock() - manager = checker.Manager(style_guide, []) + manager = checker.Manager(style_guide, finder.Checkers([], [], [])) # multiple checkers is needed for parallel mode manager.checkers = [mock.Mock(), mock.Mock()] return manager @@ -54,7 +55,7 @@ def test_oserrors_are_reraised(_): def test_multiprocessing_is_disabled(_): """Verify not being able to import multiprocessing forces jobs to 0.""" style_guide = style_guide_mock() - manager = checker.Manager(style_guide, []) + manager = checker.Manager(style_guide, finder.Checkers([], [], [])) assert manager.jobs == 0 @@ -68,7 +69,7 @@ def test_multiprocessing_cpu_count_not_implemented(): "cpu_count", side_effect=NotImplementedError, ): - manager = checker.Manager(style_guide, []) + manager = checker.Manager(style_guide, finder.Checkers([], [], [])) assert manager.jobs == 0 @@ -77,13 +78,7 @@ def test_make_checkers(_): """Verify that we create a list of FileChecker instances.""" style_guide = style_guide_mock() style_guide.options.filenames = ["file1", "file2"] - checkplugins = mock.Mock() - checkplugins.to_dictionary.return_value = { - "ast_plugins": [], - "logical_line_plugins": [], - "physical_line_plugins": [], - } - manager = checker.Manager(style_guide, checkplugins) + manager = checker.Manager(style_guide, finder.Checkers([], [], [])) with mock.patch("flake8.utils.fnmatch", return_value=True): with mock.patch("flake8.processor.FileProcessor"): diff --git a/tests/unit/test_debug.py b/tests/unit/test_debug.py index 28ab48e..284dec6 100644 --- a/tests/unit/test_debug.py +++ b/tests/unit/test_debug.py @@ -1,56 +1,47 @@ -"""Tests for our debugging module.""" from unittest import mock -import pytest - +from flake8._compat import importlib_metadata from flake8.main import debug +from flake8.plugins import finder -@pytest.mark.parametrize( - ("versions", "expected"), - ( - ([], []), - ( - [("p1", "1"), ("p2", "2"), ("p1", "1")], - [ - {"plugin": "p1", "version": "1"}, - {"plugin": "p2", "version": "2"}, +def test_debug_information(): + def _plugin(pkg, version, ep_name): + return finder.LoadedPlugin( + finder.Plugin( + pkg, + version, + importlib_metadata.EntryPoint( + ep_name, "dne:dne", "flake8.extension" + ), + ), + None, + {}, + ) + + plugins = finder.Plugins( + checkers=finder.Checkers( + tree=[ + _plugin("pkg1", "1.2.3", "X1"), + _plugin("pkg1", "1.2.3", "X2"), + _plugin("pkg2", "4.5.6", "X3"), ], + logical_line=[], + physical_line=[], ), - ), -) -def test_plugins_from(versions, expected): - """Test that we format plugins appropriately.""" - option_manager = mock.Mock(**{"manager.versions.return_value": versions}) - assert expected == debug.plugins_from(option_manager) + reporters={}, + ) - -@mock.patch("platform.python_implementation", return_value="CPython") -@mock.patch("platform.python_version", return_value="3.5.3") -@mock.patch("platform.system", return_value="Linux") -def test_information(system, pyversion, pyimpl): - """Verify that we return all the information we care about.""" - expected = { - "version": "3.1.0", + info = debug.information("9001", plugins) + assert info == { + "version": "9001", "plugins": [ - {"plugin": "mccabe", "version": "0.5.9"}, - {"plugin": "pycodestyle", "version": "2.0.0"}, + {"plugin": "pkg1", "version": "1.2.3"}, + {"plugin": "pkg2", "version": "4.5.6"}, ], "platform": { - "python_implementation": "CPython", - "python_version": "3.5.3", - "system": "Linux", + "python_implementation": mock.ANY, + "python_version": mock.ANY, + "system": mock.ANY, }, } - plugins = mock.Mock( - **{ - "manager.versions.return_value": [ - ("pycodestyle", "2.0.0"), - ("mccabe", "0.5.9"), - ] - } - ) - assert expected == debug.information("3.1.0", plugins) - pyimpl.assert_called_once_with() - pyversion.assert_called_once_with() - system.assert_called_once_with() diff --git a/tests/unit/test_file_checker.py b/tests/unit/test_file_checker.py index bcc8b32..ee4f745 100644 --- a/tests/unit/test_file_checker.py +++ b/tests/unit/test_file_checker.py @@ -1,26 +1,33 @@ """Unit tests for the FileChecker class.""" +import argparse from unittest import mock import pytest import flake8 from flake8 import checker +from flake8._compat import importlib_metadata +from flake8.plugins import finder @mock.patch("flake8.checker.FileChecker._make_processor", return_value=None) def test_repr(*args): """Verify we generate a correct repr.""" file_checker = checker.FileChecker( - "example.py", - checks={}, - options=object(), + filename="example.py", + plugins=finder.Checkers([], [], []), + options=argparse.Namespace(), ) assert repr(file_checker) == "FileChecker for example.py" def test_nonexistent_file(): """Verify that checking non-existent file results in an error.""" - c = checker.FileChecker("foobar.py", checks={}, options=object()) + c = checker.FileChecker( + filename="example.py", + plugins=finder.Checkers([], [], []), + options=argparse.Namespace(), + ) assert c.processor is None assert not c.should_process @@ -31,17 +38,21 @@ def test_nonexistent_file(): def test_raises_exception_on_failed_plugin(tmp_path, default_options): """Checks that a failing plugin results in PluginExecutionFailed.""" - foobar = tmp_path / "foobar.py" - foobar.write_text("I exist!") # Create temp file - plugin = { - "name": "failure", - "plugin_name": "failure", # Both are necessary - "parameters": dict(), - "plugin": mock.MagicMock(side_effect=ValueError), - } - """Verify a failing plugin results in an plugin error""" + fname = tmp_path.joinpath("t.py") + fname.touch() + plugin = finder.LoadedPlugin( + finder.Plugin( + "plugin-name", + "1.2.3", + importlib_metadata.EntryPoint("X", "dne:dne", "flake8.extension"), + ), + mock.Mock(side_effect=ValueError), + {}, + ) fchecker = checker.FileChecker( - str(foobar), checks=[], options=default_options + filename=str(fname), + plugins=finder.Checkers([], [], []), + options=default_options, ) with pytest.raises(flake8.exceptions.PluginExecutionFailed): fchecker.run_check(plugin) diff --git a/tests/unit/test_main_options.py b/tests/unit/test_main_options.py new file mode 100644 index 0000000..aea2071 --- /dev/null +++ b/tests/unit/test_main_options.py @@ -0,0 +1,17 @@ +from flake8.main import options + + +def test_stage1_arg_parser(): + stage1_parser = options.stage1_arg_parser() + opts, args = stage1_parser.parse_known_args( + ["--foo", "--verbose", "src", "setup.py", "--statistics", "--version"] + ) + + assert opts.verbose + assert args == ["--foo", "src", "setup.py", "--statistics", "--version"] + + +def test_stage1_arg_parser_ignores_help(): + stage1_parser = options.stage1_arg_parser() + _, args = stage1_parser.parse_known_args(["--help", "-h"]) + assert args == ["--help", "-h"] diff --git a/tests/unit/test_option_manager.py b/tests/unit/test_option_manager.py index c6006d3..a5e3d33 100644 --- a/tests/unit/test_option_manager.py +++ b/tests/unit/test_option_manager.py @@ -252,17 +252,6 @@ def test_optparse_normalize_help(optmanager, capsys): assert "default: bar" in output -def test_optmanager_group(optmanager, capsys): - """Test that group(...) causes options to be assigned to a group.""" - with optmanager.group("groupname"): - optmanager.add_option("--foo") - with pytest.raises(SystemExit): - optmanager.parse_args(["--help"]) - out, err = capsys.readouterr() - output = out + err - assert "\ngroupname:\n" in output - - @pytest.mark.parametrize( ("s", "is_auto", "n_jobs"), ( diff --git a/tests/unit/test_plugin.py b/tests/unit/test_plugin.py deleted file mode 100644 index c41198e..0000000 --- a/tests/unit/test_plugin.py +++ /dev/null @@ -1,169 +0,0 @@ -"""Tests for flake8.plugins.manager.Plugin.""" -import argparse -from unittest import mock - -import pytest - -from flake8 import exceptions -from flake8.options import manager as options_manager -from flake8.plugins import manager - - -def test_load_plugin_fallsback_on_old_setuptools(): - """Verify we fallback gracefully to on old versions of setuptools.""" - entry_point = mock.Mock(spec=["load"]) - plugin = manager.Plugin("T000", entry_point) - - plugin.load_plugin() - entry_point.load.assert_called_once_with() - - -def test_load_plugin_is_idempotent(): - """Verify we use the preferred methods on new versions of setuptools.""" - entry_point = mock.Mock(spec=["load"]) - plugin = manager.Plugin("T000", entry_point) - - plugin.load_plugin() - plugin.load_plugin() - plugin.load_plugin() - entry_point.load.assert_called_once_with() - - -def test_load_plugin_catches_and_reraises_exceptions(): - """Verify we raise our own FailedToLoadPlugin.""" - entry_point = mock.Mock(spec=["load"]) - entry_point.load.side_effect = ValueError("Test failure") - plugin = manager.Plugin("T000", entry_point) - - with pytest.raises(exceptions.FailedToLoadPlugin): - plugin.load_plugin() - - -def test_load_noncallable_plugin(): - """Verify that we do not load a non-callable plugin.""" - entry_point = mock.Mock(spec=["load"]) - entry_point.load.return_value = mock.NonCallableMock() - plugin = manager.Plugin("T000", entry_point) - - with pytest.raises(exceptions.FailedToLoadPlugin): - plugin.load_plugin() - entry_point.load.assert_called_once_with() - - -def test_plugin_property_loads_plugin_on_first_use(): - """Verify that we load our plugin when we first try to use it.""" - entry_point = mock.Mock(spec=["load"]) - plugin = manager.Plugin("T000", entry_point) - - assert plugin.plugin is not None - entry_point.load.assert_called_once_with() - - -def test_execute_calls_plugin_with_passed_arguments(): - """Verify that we pass arguments directly to the plugin.""" - entry_point = mock.Mock(spec=["load"]) - plugin_obj = mock.Mock() - plugin = manager.Plugin("T000", entry_point) - plugin._plugin = plugin_obj - - plugin.execute("arg1", "arg2", kwarg1="value1", kwarg2="value2") - plugin_obj.assert_called_once_with( - "arg1", "arg2", kwarg1="value1", kwarg2="value2" - ) - - # Extra assertions - assert entry_point.load.called is False - - -def test_version_proxies_to_the_plugin(): - """Verify that we pass arguments directly to the plugin.""" - entry_point = mock.Mock(spec=["load"]) - plugin_obj = mock.Mock(spec_set=["version"]) - plugin_obj.version = "a.b.c" - plugin = manager.Plugin("T000", entry_point) - plugin._plugin = plugin_obj - - assert plugin.version == "a.b.c" - - -def test_register_options(): - """Verify we call add_options on the plugin only if it exists.""" - # Set up our mocks and Plugin object - entry_point = mock.Mock(spec=["load"]) - plugin_obj = mock.Mock( - spec_set=["name", "version", "add_options", "parse_options"] - ) - option_manager = mock.MagicMock(spec=options_manager.OptionManager) - plugin = manager.Plugin("T000", entry_point) - plugin._plugin = plugin_obj - - # Call the method we're testing. - plugin.register_options(option_manager) - - # Assert that we call add_options - plugin_obj.add_options.assert_called_once_with(option_manager) - - -def test_register_options_checks_plugin_for_method(): - """Verify we call add_options on the plugin only if it exists.""" - # Set up our mocks and Plugin object - entry_point = mock.Mock(spec=["load"]) - plugin_obj = mock.Mock(spec_set=["name", "version", "parse_options"]) - option_manager = mock.Mock(spec=["register_plugin"]) - plugin = manager.Plugin("T000", entry_point) - plugin._plugin = plugin_obj - - # Call the method we're testing. - plugin.register_options(option_manager) - - # Assert that we register the plugin - assert option_manager.register_plugin.called is False - - -def test_provide_options(): - """Verify we call add_options on the plugin only if it exists.""" - # Set up our mocks and Plugin object - entry_point = mock.Mock(spec=["load"]) - plugin_obj = mock.Mock( - spec_set=["name", "version", "add_options", "parse_options"] - ) - option_values = argparse.Namespace(enable_extensions=[]) - option_manager = mock.Mock() - plugin = manager.Plugin("T000", entry_point) - plugin._plugin = plugin_obj - - # Call the method we're testing. - plugin.provide_options(option_manager, option_values, None) - - # Assert that we call add_options - plugin_obj.parse_options.assert_called_once_with( - option_manager, option_values, None - ) - - -@pytest.mark.parametrize( - "ignore_list, code, expected_list", - [ - (["E", "W", "F", "C9"], "W", ["E", "F", "C9"]), - (["E", "W", "F"], "C9", ["E", "W", "F"]), - ], -) -def test_enable(ignore_list, code, expected_list): - """Verify that enabling a plugin removes it from the ignore list.""" - options = mock.Mock(ignore=ignore_list) - optmanager = mock.Mock() - plugin = manager.Plugin(code, mock.Mock()) - - plugin.enable(optmanager, options) - - assert options.ignore == expected_list - - -def test_enable_without_providing_parsed_options(): - """Verify that enabling a plugin removes it from the ignore list.""" - optmanager = mock.Mock() - plugin = manager.Plugin("U4", mock.Mock()) - - plugin.enable(optmanager) - - optmanager.remove_from_default_ignore.assert_called_once_with(["U4"]) diff --git a/tests/unit/test_plugin_manager.py b/tests/unit/test_plugin_manager.py deleted file mode 100644 index 5a38a38..0000000 --- a/tests/unit/test_plugin_manager.py +++ /dev/null @@ -1,57 +0,0 @@ -"""Tests for flake8.plugins.manager.PluginManager.""" -from unittest import mock - -from flake8._compat import importlib_metadata -from flake8.plugins import manager - - -@mock.patch.object(importlib_metadata, "entry_points") -def test_calls_entrypoints_on_instantiation(entry_points_mck): - """Verify that we call entry_points() when we create a manager.""" - entry_points_mck.return_value = {} - manager.PluginManager(namespace="testing.entrypoints") - entry_points_mck.assert_called_once_with() - - -@mock.patch.object(importlib_metadata, "entry_points") -def test_calls_entrypoints_creates_plugins_automaticaly(entry_points_mck): - """Verify that we create Plugins on instantiation.""" - entry_points_mck.return_value = { - "testing.entrypoints": [ - importlib_metadata.EntryPoint("T100", "", "testing.entrypoints"), - importlib_metadata.EntryPoint("T200", "", "testing.entrypoints"), - ], - } - plugin_mgr = manager.PluginManager(namespace="testing.entrypoints") - - entry_points_mck.assert_called_once_with() - assert "T100" in plugin_mgr.plugins - assert "T200" in plugin_mgr.plugins - assert isinstance(plugin_mgr.plugins["T100"], manager.Plugin) - assert isinstance(plugin_mgr.plugins["T200"], manager.Plugin) - - -@mock.patch.object(importlib_metadata, "entry_points") -def test_handles_mapping_functions_across_plugins(entry_points_mck): - """Verify we can use the PluginManager call functions on all plugins.""" - entry_points_mck.return_value = { - "testing.entrypoints": [ - importlib_metadata.EntryPoint("T100", "", "testing.entrypoints"), - importlib_metadata.EntryPoint("T200", "", "testing.entrypoints"), - ], - } - plugin_mgr = manager.PluginManager(namespace="testing.entrypoints") - plugins = [plugin_mgr.plugins[name] for name in plugin_mgr.names] - - assert list(plugin_mgr.map(lambda x: x)) == plugins - - -@mock.patch.object(importlib_metadata, "entry_points") -def test_local_plugins(entry_points_mck): - """Verify PluginManager can load given local plugins.""" - entry_points_mck.return_value = {} - plugin_mgr = manager.PluginManager( - namespace="testing.entrypoints", local_plugins=["X = path.to:Plugin"] - ) - - assert plugin_mgr.plugins["X"].entry_point.value == "path.to:Plugin" diff --git a/tests/unit/test_plugin_type_manager.py b/tests/unit/test_plugin_type_manager.py deleted file mode 100644 index ed4fa8c..0000000 --- a/tests/unit/test_plugin_type_manager.py +++ /dev/null @@ -1,177 +0,0 @@ -"""Tests for flake8.plugins.manager.PluginTypeManager.""" -from unittest import mock - -import pytest - -from flake8 import exceptions -from flake8.plugins import manager - -TEST_NAMESPACE = "testing.plugin-type-manager" - - -def create_plugin_mock(raise_exception=False): - """Create an auto-spec'd mock of a flake8 Plugin.""" - plugin = mock.create_autospec(manager.Plugin, instance=True) - if raise_exception: - plugin.load_plugin.side_effect = exceptions.FailedToLoadPlugin( - plugin_name="T101", - exception=ValueError("Test failure"), - ) - return plugin - - -def create_mapping_manager_mock(plugins): - """Create a mock for the PluginManager.""" - # Have a function that will actually call the method underneath - def fake_map(func): - for plugin in plugins.values(): - yield func(plugin) - - # Mock out the PluginManager instance - manager_mock = mock.Mock(spec=["map"]) - # Replace the map method - manager_mock.map = fake_map - # Store the plugins - manager_mock.plugins = plugins - return manager_mock - - -class FakeTestType(manager.PluginTypeManager): - """Fake PluginTypeManager.""" - - namespace = TEST_NAMESPACE - - -@mock.patch("flake8.plugins.manager.PluginManager", autospec=True) -def test_instantiates_a_manager(PluginManager): # noqa: N803 - """Verify we create a PluginManager on instantiation.""" - FakeTestType() - - PluginManager.assert_called_once_with(TEST_NAMESPACE, local_plugins=None) - - -@mock.patch("flake8.plugins.manager.PluginManager", autospec=True) -def test_proxies_names_to_manager(PluginManager): # noqa: N803 - """Verify we proxy the names attribute.""" - PluginManager.return_value = mock.Mock(names=["T100", "T200", "T300"]) - type_mgr = FakeTestType() - - assert type_mgr.names == ["T100", "T200", "T300"] - - -@mock.patch("flake8.plugins.manager.PluginManager", autospec=True) -def test_proxies_plugins_to_manager(PluginManager): # noqa: N803 - """Verify we proxy the plugins attribute.""" - PluginManager.return_value = mock.Mock(plugins=["T100", "T200", "T300"]) - type_mgr = FakeTestType() - - assert type_mgr.plugins == ["T100", "T200", "T300"] - - -def test_generate_call_function(): - """Verify the function we generate.""" - optmanager = object() - plugin = mock.Mock(method_name=lambda x: x) - func = manager.PluginTypeManager._generate_call_function( - "method_name", - optmanager, - ) - - assert callable(func) - assert func(plugin) is optmanager - - -@mock.patch("flake8.plugins.manager.PluginManager", autospec=True) -def test_load_plugins(PluginManager): # noqa: N803 - """Verify load plugins loads *every* plugin.""" - # Create a bunch of fake plugins - plugins = {"T10%i" % i: create_plugin_mock() for i in range(8)} - # Return our PluginManager mock - PluginManager.return_value.plugins = plugins - - type_mgr = FakeTestType() - # Load the plugins (do what we're actually testing) - type_mgr.load_plugins() - # Assert that our closure does what we think it does - for plugin in plugins.values(): - plugin.load_plugin.assert_called_once_with() - assert type_mgr.plugins_loaded is True - - -@mock.patch("flake8.plugins.manager.PluginManager") -def test_load_plugins_fails(PluginManager): # noqa: N803 - """Verify load plugins bubbles up exceptions.""" - plugins_list = [create_plugin_mock(i == 1) for i in range(8)] - plugins = {"T10%i" % i: plugin for i, plugin in enumerate(plugins_list)} - # Return our PluginManager mock - PluginManager.return_value.plugins = plugins - - type_mgr = FakeTestType() - with pytest.raises(exceptions.FailedToLoadPlugin): - type_mgr.load_plugins() - - # Assert we didn't finish loading plugins - assert type_mgr.plugins_loaded is False - # Assert the first two plugins had their load_plugin method called - plugins_list[0].load_plugin.assert_called_once_with() - plugins_list[1].load_plugin.assert_called_once_with() - # Assert the rest of the plugins were not loaded - for plugin in plugins_list[2:]: - assert plugin.load_plugin.called is False - - -@mock.patch("flake8.plugins.manager.PluginManager") -def test_register_options(PluginManager): # noqa: N803 - """Test that we map over every plugin to register options.""" - plugins = {"T10%i" % i: create_plugin_mock() for i in range(8)} - # Return our PluginManager mock - PluginManager.return_value = create_mapping_manager_mock(plugins) - optmanager = object() - - type_mgr = FakeTestType() - type_mgr.register_options(optmanager) - - for plugin in plugins.values(): - plugin.register_options.assert_called_with(optmanager) - - -@mock.patch("flake8.plugins.manager.PluginManager") -def test_provide_options(PluginManager): # noqa: N803 - """Test that we map over every plugin to provide parsed options.""" - plugins = {"T10%i" % i: create_plugin_mock() for i in range(8)} - # Return our PluginManager mock - PluginManager.return_value = create_mapping_manager_mock(plugins) - optmanager = object() - options = object() - - type_mgr = FakeTestType() - type_mgr.provide_options(optmanager, options, []) - - for plugin in plugins.values(): - plugin.provide_options.assert_called_with(optmanager, options, []) - - -@mock.patch("flake8.plugins.manager.PluginManager", autospec=True) -def test_proxy_contains_to_managers_plugins_dict(PluginManager): # noqa: N803 - """Verify that we proxy __contains__ to the manager's dictionary.""" - plugins = {"T10%i" % i: create_plugin_mock() for i in range(8)} - # Return our PluginManager mock - PluginManager.return_value.plugins = plugins - - type_mgr = FakeTestType() - for i in range(8): - key = "T10%i" % i - assert key in type_mgr - - -@mock.patch("flake8.plugins.manager.PluginManager") -def test_proxies_getitem_to_managers_plugins_dict(PluginManager): # noqa: N803 - """Verify that we can use the PluginTypeManager like a dictionary.""" - plugins = {"T10%i" % i: create_plugin_mock() for i in range(8)} - # Return our PluginManager mock - PluginManager.return_value.plugins = plugins - - type_mgr = FakeTestType() - for i in range(8): - key = "T10%i" % i - assert type_mgr[key] is plugins[key] diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index eb76572..167ba72 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -9,7 +9,6 @@ import pytest from flake8 import exceptions from flake8 import utils -from flake8.plugins import manager as plugin_manager RELATIVE_PATHS = ["flake8", "pep8", "pyflakes", "mccabe"] @@ -181,34 +180,6 @@ def test_fnmatch(filename, patterns, expected): assert utils.fnmatch(filename, patterns) is expected -def test_parameters_for_class_plugin(): - """Verify that we can retrieve the parameters for a class plugin.""" - - class FakeCheck: - def __init__(self, tree): - raise NotImplementedError - - plugin = plugin_manager.Plugin("plugin-name", object()) - plugin._plugin = FakeCheck - assert utils.parameters_for(plugin) == {"tree": True} - - -def test_parameters_for_function_plugin(): - """Verify that we retrieve the parameters for a function plugin.""" - - def fake_plugin(physical_line, self, tree, optional=None): - raise NotImplementedError - - plugin = plugin_manager.Plugin("plugin-name", object()) - plugin._plugin = fake_plugin - assert utils.parameters_for(plugin) == { - "physical_line": True, - "self": True, - "tree": True, - "optional": False, - } - - def read_diff_file(filename): """Read the diff file in its entirety.""" with open(filename) as fd: