rework plugin loading

This commit is contained in:
Anthony Sottile 2021-12-31 13:11:00 -08:00
parent 38c5eceda9
commit 50d69150c1
36 changed files with 1277 additions and 1505 deletions

View file

@ -1,4 +1,5 @@
"""Checker Manager and Checker classes."""
import argparse
import collections
import errno
import itertools
@ -6,6 +7,7 @@ import logging
import multiprocessing.pool
import signal
import tokenize
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
@ -16,6 +18,9 @@ from flake8 import exceptions
from flake8 import processor
from flake8 import utils
from flake8.discover_files import expand_paths
from flake8.plugins.finder import Checkers
from flake8.plugins.finder import LoadedPlugin
from flake8.style_guide import StyleGuideManager
Results = List[Tuple[str, int, int, str, Optional[str]]]
@ -56,21 +61,15 @@ class Manager:
together and make our output deterministic.
"""
def __init__(self, style_guide, checker_plugins):
"""Initialize our Manager instance.
:param style_guide:
The instantiated style guide for this instance of Flake8.
:type style_guide:
flake8.style_guide.StyleGuide
:param checker_plugins:
The plugins representing checks parsed from entry-points.
:type checker_plugins:
flake8.plugins.manager.Checkers
"""
def __init__(
self,
style_guide: StyleGuideManager,
plugins: Checkers,
) -> None:
"""Initialize our Manager instance."""
self.style_guide = style_guide
self.options = style_guide.options
self.checks = checker_plugins
self.plugins = plugins
self.jobs = self._job_count()
self._all_checkers: List[FileChecker] = []
self.checkers: List[FileChecker] = []
@ -158,9 +157,12 @@ class Manager:
if paths is None:
paths = self.options.filenames
checks = self.checks.to_dictionary()
self._all_checkers = [
FileChecker(filename, checks, self.options)
FileChecker(
filename=filename,
plugins=self.plugins,
options=self.options,
)
for filename in expand_paths(
paths=paths,
stdin_display_name=self.options.stdin_display_name,
@ -273,23 +275,17 @@ class Manager:
class FileChecker:
"""Manage running checks for a file and aggregate the results."""
def __init__(self, filename, checks, options):
"""Initialize our file checker.
:param str filename:
Name of the file to check.
:param checks:
The plugins registered to check the file.
:type checks:
dict
:param options:
Parsed option values from config and command-line.
:type options:
argparse.Namespace
"""
def __init__(
self,
*,
filename: str,
plugins: Checkers,
options: argparse.Namespace,
) -> None:
"""Initialize our file checker."""
self.options = options
self.filename = filename
self.checks = checks
self.plugins = plugins
self.results: Results = []
self.statistics = {
"tokens": 0,
@ -342,29 +338,27 @@ class FileChecker:
self.results.append((error_code, line_number, column, text, line))
return error_code
def run_check(self, plugin, **arguments):
def run_check(self, plugin: LoadedPlugin, **arguments: Any) -> Any:
"""Run the check in a single plugin."""
LOG.debug("Running %r with %r", plugin, arguments)
assert self.processor is not None
try:
self.processor.keyword_arguments_for(
plugin["parameters"], arguments
)
self.processor.keyword_arguments_for(plugin.parameters, arguments)
except AttributeError as ae:
LOG.error("Plugin requested unknown parameters.")
raise exceptions.PluginRequestedUnknownParameters(
plugin_name=plugin["plugin_name"], exception=ae
plugin_name=plugin.plugin.package, exception=ae
)
try:
return plugin["plugin"](**arguments)
return plugin.obj(**arguments)
except Exception as all_exc:
LOG.critical(
"Plugin %s raised an unexpected exception",
plugin["name"],
plugin.display_name,
exc_info=True,
)
raise exceptions.PluginExecutionFailed(
plugin_name=plugin["plugin_name"], exception=all_exc
plugin_name=plugin.display_name, exception=all_exc
)
@staticmethod
@ -431,7 +425,7 @@ class FileChecker:
assert self.processor is not None
ast = self.processor.build_ast()
for plugin in self.checks["ast_plugins"]:
for plugin in self.plugins.tree:
checker = self.run_check(plugin, tree=ast)
# If the plugin uses a class, call the run method of it, otherwise
# the call should return something iterable itself
@ -457,7 +451,7 @@ class FileChecker:
LOG.debug('Logical line: "%s"', logical_line.rstrip())
for plugin in self.checks["logical_line_plugins"]:
for plugin in self.plugins.logical_line:
self.processor.update_checker_state_for(plugin)
results = self.run_check(plugin, logical_line=logical_line) or ()
for offset, text in results:
@ -479,7 +473,7 @@ class FileChecker:
A single physical check may return multiple errors.
"""
assert self.processor is not None
for plugin in self.checks["physical_line_plugins"]:
for plugin in self.plugins.physical_line:
self.processor.update_checker_state_for(plugin)
result = self.run_check(plugin, physical_line=physical_line)

View file

@ -3,7 +3,6 @@ import argparse
import configparser
import json
import logging
import sys
import time
from typing import Dict
from typing import List
@ -12,7 +11,6 @@ from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
import flake8
from flake8 import checker
@ -20,15 +18,14 @@ from flake8 import defaults
from flake8 import exceptions
from flake8 import style_guide
from flake8 import utils
from flake8.formatting.base import BaseFormatter
from flake8.main import debug
from flake8.main import options
from flake8.options import aggregator
from flake8.options import config
from flake8.options import manager
from flake8.plugins import manager as plugin_manager
if TYPE_CHECKING:
from flake8.formatting.base import BaseFormatter
from flake8.plugins import finder
from flake8.plugins import reporter
LOG = logging.getLogger(__name__)
@ -56,12 +53,7 @@ class Application:
#: to parse and handle the options and arguments passed by the user
self.option_manager: Optional[manager.OptionManager] = None
#: The instance of :class:`flake8.plugins.manager.Checkers`
self.check_plugins: Optional[plugin_manager.Checkers] = None
#: The instance of :class:`flake8.plugins.manager.ReportFormatters`
self.formatting_plugins: Optional[
plugin_manager.ReportFormatters
] = None
self.plugins: Optional[finder.Plugins] = None
#: The user-selected formatter from :attr:`formatting_plugins`
self.formatter: Optional[BaseFormatter] = None
#: The :class:`flake8.style_guide.StyleGuideManager` built from the
@ -130,49 +122,23 @@ class Application:
) -> None:
"""Find and load the plugins for this application.
Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes
based on the discovered plugins found.
Set :attr:`plugins` based on loaded plugins.
"""
# TODO: move to src/flake8/plugins/finder.py
extension_local = utils.parse_comma_separated_list(
cfg.get("flake8:local-plugins", "extension", fallback="").strip(),
regexp=utils.LOCAL_PLUGIN_LIST_RE,
)
report_local = utils.parse_comma_separated_list(
cfg.get("flake8:local-plugins", "report", fallback="").strip(),
regexp=utils.LOCAL_PLUGIN_LIST_RE,
)
paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
local_paths = utils.parse_comma_separated_list(paths_s)
local_paths = utils.normalize_paths(local_paths, cfg_dir)
sys.path.extend(local_paths)
self.check_plugins = plugin_manager.Checkers(extension_local)
self.formatting_plugins = plugin_manager.ReportFormatters(report_local)
self.check_plugins.load_plugins()
self.formatting_plugins.load_plugins()
raw_plugins = finder.find_plugins(cfg)
local_plugin_paths = finder.find_local_plugin_paths(cfg, cfg_dir)
self.plugins = finder.load_plugins(raw_plugins, local_plugin_paths)
def register_plugin_options(self) -> None:
"""Register options provided by plugins to our option manager."""
assert self.check_plugins is not None
assert self.formatting_plugins is not None
assert self.plugins is not None
versions = sorted(set(self.check_plugins.manager.versions()))
self.option_manager = manager.OptionManager(
version=flake8.__version__,
plugin_versions=", ".join(
f"{name}: {version}" for name, version in versions
),
plugin_versions=self.plugins.versions_str(),
parents=[self.prelim_arg_parser],
)
options.register_default_options(self.option_manager)
self.check_plugins.register_options(self.option_manager)
self.formatting_plugins.register_options(self.option_manager)
self.option_manager.register_plugins(self.plugins)
def parse_configuration_and_cli(
self,
@ -182,6 +148,7 @@ class Application:
) -> None:
"""Parse configuration files and the CLI options."""
assert self.option_manager is not None
assert self.plugins is not None
self.options = aggregator.aggregate_options(
self.option_manager,
cfg,
@ -190,8 +157,7 @@ class Application:
)
if self.options.bug_report:
assert self.check_plugins is not None
info = debug.information(flake8.__version__, self.check_plugins)
info = debug.information(flake8.__version__, self.plugins)
print(json.dumps(info, indent=2, sort_keys=True))
raise SystemExit(0)
@ -202,44 +168,28 @@ class Application:
)
self.parsed_diff = utils.parse_unified_diff()
assert self.check_plugins is not None
self.check_plugins.provide_options(
self.option_manager, self.options, self.options.filenames
)
assert self.formatting_plugins is not None
self.formatting_plugins.provide_options(
self.option_manager, self.options, self.options.filenames
)
for loaded in self.plugins.all_plugins():
parse_options = getattr(loaded.obj, "parse_options", None)
if parse_options is None:
continue
def formatter_for(self, formatter_plugin_name):
"""Retrieve the formatter class by plugin name."""
assert self.formatting_plugins is not None
default_formatter = self.formatting_plugins["default"]
formatter_plugin = self.formatting_plugins.get(formatter_plugin_name)
if formatter_plugin is None:
LOG.warning(
'"%s" is an unknown formatter. Falling back to default.',
formatter_plugin_name,
)
formatter_plugin = default_formatter
return formatter_plugin.execute
# XXX: ideally we would't have two forms of parse_options
try:
parse_options(
self.option_manager,
self.options,
self.options.filenames,
)
except TypeError:
parse_options(self.options)
def make_formatter(
self, formatter_class: Optional[Type["BaseFormatter"]] = None
self, formatter_class: Optional[Type[BaseFormatter]] = None
) -> None:
"""Initialize a formatter based on the parsed options."""
assert self.plugins is not None
assert self.options is not None
format_plugin = self.options.format
if 1 <= self.options.quiet < 2:
format_plugin = "quiet-filename"
elif 2 <= self.options.quiet:
format_plugin = "quiet-nothing"
if formatter_class is None:
formatter_class = self.formatter_for(format_plugin)
self.formatter = formatter_class(self.options)
self.formatter = reporter.make(self.plugins.reporters, self.options)
def make_guide(self) -> None:
"""Initialize our StyleGuide."""
@ -254,9 +204,11 @@ class Application:
def make_file_checker_manager(self) -> None:
"""Initialize our FileChecker Manager."""
assert self.guide is not None
assert self.plugins is not None
self.file_checker_manager = checker.Manager(
style_guide=self.guide,
checker_plugins=self.check_plugins,
plugins=self.plugins.checkers,
)
def run_checks(self) -> None:

View file

@ -2,30 +2,28 @@
import platform
from typing import Any
from typing import Dict
from typing import List
from flake8.plugins.manager import PluginTypeManager
from flake8.plugins.finder import Plugins
def information(
version: str,
plugins: PluginTypeManager,
) -> Dict[str, Any]:
def information(version: str, plugins: Plugins) -> Dict[str, Any]:
"""Generate the information to be printed for the bug report."""
versions = sorted(
{
(loaded.plugin.package, loaded.plugin.version)
for loaded in plugins.all_plugins()
if loaded.plugin.package not in {"flake8", "local"}
}
)
return {
"version": version,
"plugins": plugins_from(plugins),
"plugins": [
{"plugin": plugin, "version": version}
for plugin, version in versions
],
"platform": {
"python_implementation": platform.python_implementation(),
"python_version": platform.python_version(),
"system": platform.system(),
},
}
def plugins_from(plugins: PluginTypeManager) -> List[Dict[str, str]]:
"""Generate the list of plugins installed."""
return [
{"plugin": name, "version": version}
for name, version in sorted(set(plugins.manager.versions()))
]

View file

@ -1,13 +1,11 @@
"""Option handling and Option management logic."""
import argparse
import contextlib
import enum
import functools
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generator
from typing import List
from typing import Mapping
from typing import Optional
@ -18,6 +16,7 @@ from typing import Type
from typing import Union
from flake8 import utils
from flake8.plugins.finder import Plugins
LOG = logging.getLogger(__name__)
@ -351,6 +350,7 @@ class OptionManager:
),
)
self.parser.add_argument("filenames", nargs="*", metavar="filename")
self.config_options_dict: Dict[str, Option] = {}
self.options: List[Option] = []
self.extended_default_ignore: Set[str] = set()
@ -358,15 +358,32 @@ class OptionManager:
self._current_group: Optional[argparse._ArgumentGroup] = None
@contextlib.contextmanager
def group(self, name: str) -> Generator[None, None, None]:
"""Attach options to an argparse group during this context."""
group = self.parser.add_argument_group(name)
self._current_group, orig_group = group, self._current_group
try:
yield
finally:
self._current_group = orig_group
# TODO: maybe make this a free function to reduce api surface area
def register_plugins(self, plugins: Plugins) -> None:
"""Register the plugin options (if needed)."""
groups: Dict[str, argparse._ArgumentGroup] = {}
def _set_group(name: str) -> None:
try:
self._current_group = groups[name]
except KeyError:
group = self.parser.add_argument_group(name)
self._current_group = groups[name] = group
for loaded in plugins.all_plugins():
add_options = getattr(loaded.obj, "add_options", None)
if add_options:
_set_group(loaded.plugin.package)
add_options(self)
# if the plugin is off by default, disable it!
if getattr(loaded.obj, "off_by_default", False):
self.extend_default_ignore(loaded.entry_name)
else:
self.extend_default_select(loaded.entry_name)
# isn't strictly necessary, but seems cleaner
self._current_group = None
def add_option(self, *args: Any, **kwargs: Any) -> None:
"""Create and register a new option.

View file

@ -0,0 +1,256 @@
"""Functions related to finding and loading plugins."""
import configparser
import inspect
import logging
import sys
from typing import Any
from typing import Dict
from typing import Generator
from typing import Iterable
from typing import List
from typing import NamedTuple
from flake8 import utils
from flake8._compat import importlib_metadata
from flake8.exceptions import FailedToLoadPlugin
LOG = logging.getLogger(__name__)
FLAKE8_GROUPS = frozenset(("flake8.extension", "flake8.report"))
BANNED_PLUGINS = {
"flake8-colors": "4.1",
"flake8-per-file-ignores": "3.7",
}
class Plugin(NamedTuple):
"""A plugin before loading."""
package: str
version: str
entry_point: importlib_metadata.EntryPoint
class LoadedPlugin(NamedTuple):
"""Represents a plugin after being imported."""
plugin: Plugin
obj: Any
parameters: Dict[str, bool]
@property
def entry_name(self) -> str:
"""Return the name given in the packaging metadata."""
return self.plugin.entry_point.name
@property
def display_name(self) -> str:
"""Return the name for use in user-facing / error messages."""
return f"{self.plugin.package}[{self.entry_name}]"
class Checkers(NamedTuple):
"""Classified plugins needed for checking."""
tree: List[LoadedPlugin]
logical_line: List[LoadedPlugin]
physical_line: List[LoadedPlugin]
class Plugins(NamedTuple):
"""Classified plugins."""
checkers: Checkers
reporters: Dict[str, LoadedPlugin]
def all_plugins(self) -> Generator[LoadedPlugin, None, None]:
"""Return an iterator over all :class:`LoadedPlugin`s."""
yield from self.checkers.tree
yield from self.checkers.logical_line
yield from self.checkers.physical_line
yield from self.reporters.values()
def versions_str(self) -> str:
"""Return a user-displayed list of plugin versions."""
return ", ".join(
sorted(
{
f"{loaded.plugin.package}: {loaded.plugin.version}"
for loaded in self.all_plugins()
if loaded.plugin.package not in {"flake8", "local"}
}
)
)
def _flake8_plugins(
eps: Iterable[importlib_metadata.EntryPoint],
name: str,
version: str,
) -> Generator[Plugin, None, None]:
pyflakes_meta = importlib_metadata.distribution("pyflakes").metadata
pycodestyle_meta = importlib_metadata.distribution("pycodestyle").metadata
for ep in eps:
if ep.group not in FLAKE8_GROUPS:
continue
if ep.name == "F":
yield Plugin(pyflakes_meta["name"], pyflakes_meta["version"], ep)
elif ep.name.startswith("pycodestyle"):
yield Plugin(
pycodestyle_meta["name"], pycodestyle_meta["version"], ep
)
else:
yield Plugin(name, version, ep)
def _find_importlib_plugins() -> Generator[Plugin, None, None]:
for dist in importlib_metadata.distributions():
# assigned to prevent continual reparsing
eps = dist.entry_points
# perf: skip parsing `.metadata` (slow) if no entry points match
if not any(ep.group in FLAKE8_GROUPS for ep in eps):
continue
# assigned to prevent continual reparsing
meta = dist.metadata
if meta["name"] in BANNED_PLUGINS:
LOG.warning(
"%s plugin is obsolete in flake8>=%s",
meta["name"],
BANNED_PLUGINS[meta["name"]],
)
continue
elif meta["name"] == "flake8":
# special case flake8 which provides plugins for pyflakes /
# pycodestyle
yield from _flake8_plugins(eps, meta["name"], meta["version"])
continue
for ep in eps:
if ep.group in FLAKE8_GROUPS:
yield Plugin(meta["name"], meta["version"], ep)
def _find_local_plugins(
cfg: configparser.RawConfigParser,
) -> Generator[Plugin, None, None]:
for plugin_type in ("extension", "report"):
group = f"flake8.{plugin_type}"
for plugin_s in utils.parse_comma_separated_list(
cfg.get("flake8:local-plugins", plugin_type, fallback="").strip(),
regexp=utils.LOCAL_PLUGIN_LIST_RE,
):
name, _, entry_str = plugin_s.partition("=")
name, entry_str = name.strip(), entry_str.strip()
ep = importlib_metadata.EntryPoint(name, entry_str, group)
yield Plugin("local", "local", ep)
def find_plugins(cfg: configparser.RawConfigParser) -> List[Plugin]:
"""Discovers all plugins (but does not load them)."""
ret = [*_find_importlib_plugins(), *_find_local_plugins(cfg)]
# for determinism, sort the list
ret.sort()
return ret
def find_local_plugin_paths(
cfg: configparser.RawConfigParser,
cfg_dir: str,
) -> List[str]:
"""Discovers the list of ``flake8:local-plugins`` ``paths``."""
paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
paths = utils.parse_comma_separated_list(paths_s)
return utils.normalize_paths(paths, cfg_dir)
def _parameters_for(func: Any) -> Dict[str, bool]:
"""Return the parameters for the plugin.
This will inspect the plugin and return either the function parameters
if the plugin is a function or the parameters for ``__init__`` after
``self`` if the plugin is a class.
:returns:
A dictionary mapping the parameter name to whether or not it is
required (a.k.a., is positional only/does not have a default).
"""
is_class = not inspect.isfunction(func)
if is_class:
func = func.__init__
parameters = {
parameter.name: parameter.default is inspect.Parameter.empty
for parameter in inspect.signature(func).parameters.values()
if parameter.kind is inspect.Parameter.POSITIONAL_OR_KEYWORD
}
if is_class:
parameters.pop("self", None)
return parameters
def _load_plugin(plugin: Plugin) -> LoadedPlugin:
try:
obj = plugin.entry_point.load()
except Exception as e:
raise FailedToLoadPlugin(plugin.package, e)
if not callable(obj):
err = TypeError("expected loaded plugin to be callable")
raise FailedToLoadPlugin(plugin.package, err)
return LoadedPlugin(plugin, obj, _parameters_for(obj))
def _import_plugins(
plugins: List[Plugin], paths: List[str]
) -> List[LoadedPlugin]:
sys.path.extend(paths)
return [_load_plugin(p) for p in plugins]
def _classify_plugins(plugins: List[LoadedPlugin]) -> Plugins:
tree = []
logical_line = []
physical_line = []
reporters = {}
for loaded in plugins:
if loaded.plugin.entry_point.group == "flake8.report":
reporters[loaded.entry_name] = loaded
elif "tree" in loaded.parameters:
tree.append(loaded)
elif "logical_line" in loaded.parameters:
logical_line.append(loaded)
elif "physical_line" in loaded.parameters:
physical_line.append(loaded)
else:
raise NotImplementedError(f"what plugin type? {loaded}")
return Plugins(
checkers=Checkers(
tree=tree,
logical_line=logical_line,
physical_line=physical_line,
),
reporters=reporters,
)
def load_plugins(plugins: List[Plugin], paths: List[str]) -> Plugins:
"""Load and classify all flake8 plugins.
- first: extends ``sys.path`` with ``paths`` (to import local plugins)
- next: converts the ``Plugin``s to ``LoadedPlugins``
- finally: classifies plugins into their specific types
"""
return _classify_plugins(_import_plugins(plugins, paths))

View file

@ -1,530 +0,0 @@
"""Plugin loading and management logic and classes."""
import logging
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from flake8 import exceptions
from flake8 import utils
from flake8._compat import importlib_metadata
LOG = logging.getLogger(__name__)
__all__ = ("Checkers", "Plugin", "PluginManager", "ReportFormatters")
NO_GROUP_FOUND = object()
class Plugin:
"""Wrap an EntryPoint from setuptools and other logic."""
def __init__(self, name, entry_point, local=False):
"""Initialize our Plugin.
:param str name:
Name of the entry-point as it was registered with setuptools.
:param entry_point:
EntryPoint returned by setuptools.
:type entry_point:
setuptools.EntryPoint
:param bool local:
Is this a repo-local plugin?
"""
self.name = name
self.entry_point = entry_point
self.local = local
self._plugin: Any = None
self._parameters = None
self._parameter_names: Optional[List[str]] = None
self._group = None
self._plugin_name = None
self._version = None
def __repr__(self) -> str:
"""Provide an easy to read description of the current plugin."""
return 'Plugin(name="{}", entry_point="{}")'.format(
self.name, self.entry_point.value
)
def to_dictionary(self):
"""Convert this plugin to a dictionary."""
return {
"name": self.name,
"parameters": self.parameters,
"parameter_names": self.parameter_names,
"plugin": self.plugin,
"plugin_name": self.plugin_name,
}
def is_in_a_group(self):
"""Determine if this plugin is in a group.
:returns:
True if the plugin is in a group, otherwise False.
:rtype:
bool
"""
return self.group() is not None
def group(self):
"""Find and parse the group the plugin is in."""
if self._group is None:
name = self.name.split(".", 1)
if len(name) > 1:
self._group = name[0]
else:
self._group = NO_GROUP_FOUND
if self._group is NO_GROUP_FOUND:
return None
return self._group
@property
def parameters(self):
"""List of arguments that need to be passed to the plugin."""
if self._parameters is None:
self._parameters = utils.parameters_for(self)
return self._parameters
@property
def parameter_names(self) -> List[str]:
"""List of argument names that need to be passed to the plugin."""
if self._parameter_names is None:
self._parameter_names = list(self.parameters)
return self._parameter_names
@property
def plugin(self):
"""Load and return the plugin associated with the entry-point.
This property implicitly loads the plugin and then caches it.
"""
self.load_plugin()
return self._plugin
@property
def version(self) -> str:
"""Return the version of the plugin."""
version = self._version
if version is None:
if self.is_in_a_group():
version = self._version = version_for(self)
else:
version = self._version = self.plugin.version
return version
@property
def plugin_name(self):
"""Return the name of the plugin."""
if self._plugin_name is None:
if self.is_in_a_group():
self._plugin_name = self.group()
else:
self._plugin_name = self.plugin.name
return self._plugin_name
@property
def off_by_default(self):
"""Return whether the plugin is ignored by default."""
return getattr(self.plugin, "off_by_default", False)
def execute(self, *args, **kwargs):
r"""Call the plugin with \*args and \*\*kwargs."""
return self.plugin(*args, **kwargs) # pylint: disable=not-callable
def _load(self):
self._plugin = self.entry_point.load()
if not callable(self._plugin):
msg = (
f"Plugin {self._plugin!r} is not a callable. It might be "
f"written for an older version of flake8 and might not work "
f"with this version"
)
LOG.critical(msg)
raise TypeError(msg)
def load_plugin(self):
"""Retrieve the plugin for this entry-point.
This loads the plugin, stores it on the instance and then returns it.
It does not reload it after the first time, it merely returns the
cached plugin.
:returns:
Nothing
"""
if self._plugin is None:
LOG.info('Loading plugin "%s" from entry-point.', self.name)
try:
self._load()
except Exception as load_exception:
LOG.exception(load_exception)
failed_to_load = exceptions.FailedToLoadPlugin(
plugin_name=self.name, exception=load_exception
)
LOG.critical(str(failed_to_load))
raise failed_to_load
def enable(self, optmanager, options=None):
"""Remove plugin name from the default ignore list."""
optmanager.remove_from_default_ignore([self.name])
optmanager.extend_default_select([self.name])
if not options:
return
try:
options.ignore.remove(self.name)
except (ValueError, KeyError):
LOG.debug(
"Attempted to remove %s from the ignore list but it was "
"not a member of the list.",
self.name,
)
def disable(self, optmanager):
"""Add the plugin name to the default ignore list."""
optmanager.extend_default_ignore([self.name])
def provide_options(self, optmanager, options, extra_args):
"""Pass the parsed options and extra arguments to the plugin."""
parse_options = getattr(self.plugin, "parse_options", None)
if parse_options is not None:
LOG.debug('Providing options to plugin "%s".', self.name)
try:
parse_options(optmanager, options, extra_args)
except TypeError:
parse_options(options)
if self.name in options.enable_extensions:
self.enable(optmanager, options)
def register_options(self, optmanager):
"""Register the plugin's command-line options on the OptionManager.
:param optmanager:
Instantiated OptionManager to register options on.
:type optmanager:
flake8.options.manager.OptionManager
:returns:
Nothing
"""
add_options = getattr(self.plugin, "add_options", None)
if add_options is not None:
LOG.debug(
'Registering options from plugin "%s" on OptionManager %r',
self.name,
optmanager,
)
with optmanager.group(self.plugin_name):
add_options(optmanager)
if self.off_by_default:
self.disable(optmanager)
class PluginManager: # pylint: disable=too-few-public-methods
"""Find and manage plugins consistently."""
def __init__(
self, namespace: str, local_plugins: Optional[List[str]] = None
) -> None:
"""Initialize the manager.
:param str namespace:
Namespace of the plugins to manage, e.g., 'flake8.extension'.
:param list local_plugins:
Plugins from config (as "X = path.to:Plugin" strings).
"""
self.namespace = namespace
self.plugins: Dict[str, Plugin] = {}
self.names: List[str] = []
self._load_local_plugins(local_plugins or [])
self._load_entrypoint_plugins()
def _load_local_plugins(self, local_plugins):
"""Load local plugins from config.
:param list local_plugins:
Plugins from config (as "X = path.to:Plugin" strings).
"""
for plugin_str in local_plugins:
name, _, entry_str = plugin_str.partition("=")
name, entry_str = name.strip(), entry_str.strip()
entry_point = importlib_metadata.EntryPoint(
name, entry_str, self.namespace
)
self._load_plugin_from_entrypoint(entry_point, local=True)
def _load_entrypoint_plugins(self):
LOG.info('Loading entry-points for "%s".', self.namespace)
eps = importlib_metadata.entry_points().get(self.namespace, ())
# python2.7 occasionally gives duplicate results due to redundant
# `local/lib` -> `../lib` symlink on linux in virtualenvs so we
# eliminate duplicates here
for entry_point in sorted(frozenset(eps)):
if entry_point.name == "per-file-ignores":
LOG.warning(
"flake8-per-file-ignores plugin is incompatible with "
"flake8>=3.7 (which implements per-file-ignores itself)."
)
continue
elif entry_point.name == "flake8-colors":
LOG.warning(
"flake8-colors plugin is incompatible with "
"flake8>=4.1 (which implements colors itself)."
)
continue
self._load_plugin_from_entrypoint(entry_point)
def _load_plugin_from_entrypoint(self, entry_point, local=False):
"""Load a plugin from a setuptools EntryPoint.
:param EntryPoint entry_point:
EntryPoint to load plugin from.
:param bool local:
Is this a repo-local plugin?
"""
name = entry_point.name
self.plugins[name] = Plugin(name, entry_point, local=local)
self.names.append(name)
LOG.debug('Loaded %r for plugin "%s".', self.plugins[name], name)
def map(self, func, *args, **kwargs):
r"""Call ``func`` with the plugin and \*args and \**kwargs after.
This yields the return value from ``func`` for each plugin.
:param collections.Callable func:
Function to call with each plugin. Signature should at least be:
.. code-block:: python
def myfunc(plugin):
pass
Any extra positional or keyword arguments specified with map will
be passed along to this function after the plugin. The plugin
passed is a :class:`~flake8.plugins.manager.Plugin`.
:param args:
Positional arguments to pass to ``func`` after each plugin.
:param kwargs:
Keyword arguments to pass to ``func`` after each plugin.
"""
for name in self.names:
yield func(self.plugins[name], *args, **kwargs)
def versions(self):
# () -> (str, str)
"""Generate the versions of plugins.
:returns:
Tuples of the plugin_name and version
:rtype:
tuple
"""
plugins_seen: Set[str] = set()
for entry_point_name in self.names:
plugin = self.plugins[entry_point_name]
plugin_name = plugin.plugin_name
if plugin.plugin_name in plugins_seen:
continue
plugins_seen.add(plugin_name)
yield (plugin_name, plugin.version)
def version_for(plugin):
# (Plugin) -> Optional[str]
"""Determine the version of a plugin by its module.
:param plugin:
The loaded plugin
:type plugin:
Plugin
:returns:
version string for the module
:rtype:
str
"""
module_name = plugin.plugin.__module__
try:
module = __import__(module_name)
except ImportError:
return None
return getattr(module, "__version__", None)
class PluginTypeManager:
"""Parent class for most of the specific plugin types."""
namespace: str
def __init__(self, local_plugins=None):
"""Initialize the plugin type's manager.
:param list local_plugins:
Plugins from config file instead of entry-points
"""
self.manager = PluginManager(
self.namespace, local_plugins=local_plugins
)
self.plugins_loaded = False
def __contains__(self, name):
"""Check if the entry-point name is in this plugin type manager."""
LOG.debug('Checking for "%s" in plugin type manager.', name)
return name in self.plugins
def __getitem__(self, name):
"""Retrieve a plugin by its name."""
LOG.debug('Retrieving plugin for "%s".', name)
return self.plugins[name]
def get(self, name, default=None):
"""Retrieve the plugin referred to by ``name`` or return the default.
:param str name:
Name of the plugin to retrieve.
:param default:
Default value to return.
:returns:
Plugin object referred to by name, if it exists.
:rtype:
:class:`Plugin`
"""
if name in self:
return self[name]
return default
@property
def names(self):
"""Proxy attribute to underlying manager."""
return self.manager.names
@property
def plugins(self):
"""Proxy attribute to underlying manager."""
return self.manager.plugins
@staticmethod
def _generate_call_function(method_name, optmanager, *args, **kwargs):
def generated_function(plugin):
method = getattr(plugin, method_name, None)
if method is not None and callable(method):
return method(optmanager, *args, **kwargs)
return generated_function
def load_plugins(self):
"""Load all plugins of this type that are managed by this manager."""
if self.plugins_loaded:
return
for plugin in self.plugins.values():
plugin.load_plugin()
# Do not set plugins_loaded if we run into an exception
self.plugins_loaded = True
def register_options(self, optmanager):
"""Register all of the checkers' options to the OptionManager."""
self.load_plugins()
call_register_options = self._generate_call_function(
"register_options", optmanager
)
list(self.manager.map(call_register_options))
def provide_options(self, optmanager, options, extra_args):
"""Provide parsed options and extra arguments to the plugins."""
call_provide_options = self._generate_call_function(
"provide_options", optmanager, options, extra_args
)
list(self.manager.map(call_provide_options))
class Checkers(PluginTypeManager):
"""All of the checkers registered through entry-points or config."""
namespace = "flake8.extension"
def checks_expecting(self, argument_name):
"""Retrieve checks that expect an argument with the specified name.
Find all checker plugins that are expecting a specific argument.
"""
for plugin in self.plugins.values():
if argument_name == plugin.parameter_names[0]:
yield plugin
def to_dictionary(self):
"""Return a dictionary of AST and line-based plugins."""
return {
"ast_plugins": [
plugin.to_dictionary() for plugin in self.ast_plugins
],
"logical_line_plugins": [
plugin.to_dictionary() for plugin in self.logical_line_plugins
],
"physical_line_plugins": [
plugin.to_dictionary() for plugin in self.physical_line_plugins
],
}
def register_options(self, optmanager):
"""Register all of the checkers' options to the OptionManager.
This also ensures that plugins that are not part of a group and are
enabled by default are enabled on the option manager.
"""
# NOTE(sigmavirus24) We reproduce a little of
# PluginTypeManager.register_options to reduce the number of times
# that we loop over the list of plugins. Instead of looping twice,
# option registration and enabling the plugin, we loop once with one
# function to map over the plugins.
self.load_plugins()
call_register_options = self._generate_call_function(
"register_options", optmanager
)
def register_and_enable(plugin):
call_register_options(plugin)
if plugin.group() is None and not plugin.off_by_default:
plugin.enable(optmanager)
list(self.manager.map(register_and_enable))
@property
def ast_plugins(self):
"""List of plugins that expect the AST tree."""
plugins = getattr(self, "_ast_plugins", [])
if not plugins:
plugins = list(self.checks_expecting("tree"))
self._ast_plugins = plugins
return plugins
@property
def logical_line_plugins(self):
"""List of plugins that expect the logical lines."""
plugins = getattr(self, "_logical_line_plugins", [])
if not plugins:
plugins = list(self.checks_expecting("logical_line"))
self._logical_line_plugins = plugins
return plugins
@property
def physical_line_plugins(self):
"""List of plugins that expect the physical lines."""
plugins = getattr(self, "_physical_line_plugins", [])
if not plugins:
plugins = list(self.checks_expecting("physical_line"))
self._physical_line_plugins = plugins
return plugins
class ReportFormatters(PluginTypeManager):
"""All of the report formatters registered through entry-points/config."""
namespace = "flake8.report"

View file

@ -69,8 +69,6 @@ FLAKE8_PYFLAKES_CODES = {
class FlakesChecker(pyflakes.checker.Checker):
"""Subclass the Pyflakes checker to conform with the flake8 API."""
name = "pyflakes"
version = pyflakes.__version__
with_doctest = False
include_in_doctest: List[str] = []
exclude_from_doctest: List[str] = []

View file

@ -0,0 +1,41 @@
"""Functions for construcing the requested report plugin."""
import argparse
import logging
from typing import Dict
from flake8.formatting.base import BaseFormatter
from flake8.plugins.finder import LoadedPlugin
LOG = logging.getLogger(__name__)
def make(
reporters: Dict[str, LoadedPlugin],
options: argparse.Namespace,
) -> BaseFormatter:
"""Make the formatter from the requested user options.
- if :option:`flake8 --quiet` is specified, return the ``quiet-filename``
formatter.
- if :option:`flake8 --quiet` is specified at least twice, return the
``quiet-nothing`` formatter.
- otherwise attempt to return the formatter by name.
- failing that, assume it is a format string and return the ``default``
formatter.
"""
format_name = options.format
if options.quiet == 1:
format_name = "quiet-filename"
elif options.quiet >= 2:
format_name = "quiet-nothing"
try:
format_plugin = reporters[format_name]
except KeyError:
LOG.warning(
"%r is an unknown formatter. Falling back to default.",
format_name,
)
format_plugin = reporters["default"]
return format_plugin.obj(options)

View file

@ -14,6 +14,7 @@ from typing import Tuple
import flake8
from flake8 import defaults
from flake8 import utils
from flake8.plugins.finder import LoadedPlugin
LOG = logging.getLogger(__name__)
NEWLINE = frozenset([tokenize.NL, tokenize.NEWLINE])
@ -160,11 +161,11 @@ class FileProcessor:
if self.blank_before < self.blank_lines:
self.blank_before = self.blank_lines
def update_checker_state_for(self, plugin: Dict[str, Any]) -> None:
def update_checker_state_for(self, plugin: LoadedPlugin) -> None:
"""Update the checker_state attribute for the plugin."""
if "checker_state" in plugin["parameters"]:
if "checker_state" in plugin.parameters:
self.checker_state = self._checker_states.setdefault(
plugin["name"], {}
plugin.entry_name, {}
)
def next_logical_line(self) -> None:

View file

@ -2,7 +2,6 @@
import collections
import fnmatch as _fnmatch
import functools
import inspect
import io
import logging
import os
@ -18,14 +17,10 @@ from typing import Pattern
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union
from flake8 import exceptions
if TYPE_CHECKING:
from flake8.plugins.manager import Plugin
DIFF_HUNK_REGEXP = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$")
COMMA_SEPARATED_LIST_RE = re.compile(r"[,\s]")
LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]")
@ -310,40 +305,6 @@ def fnmatch(filename: str, patterns: Sequence[str]) -> bool:
return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns)
def parameters_for(plugin: "Plugin") -> Dict[str, bool]:
"""Return the parameters for the plugin.
This will inspect the plugin and return either the function parameters
if the plugin is a function or the parameters for ``__init__`` after
``self`` if the plugin is a class.
:param plugin:
The internal plugin object.
:type plugin:
flake8.plugins.manager.Plugin
:returns:
A dictionary mapping the parameter name to whether or not it is
required (a.k.a., is positional only/does not have a default).
:rtype:
dict([(str, bool)])
"""
func = plugin.plugin
is_class = not inspect.isfunction(func)
if is_class: # The plugin is a class
func = plugin.plugin.__init__
parameters = {
parameter.name: parameter.default is parameter.empty
for parameter in inspect.signature(func).parameters.values()
if parameter.kind == parameter.POSITIONAL_OR_KEYWORD
}
if is_class:
parameters.pop("self", None)
return parameters
def matches_filename(
path: str,
patterns: Sequence[str],