add a --require-plugins option

This commit is contained in:
Anthony Sottile 2022-01-22 13:58:46 -05:00
parent ca573a7ccf
commit d03b9c97cc
10 changed files with 303 additions and 72 deletions

View file

@ -212,7 +212,12 @@ def get_style_guide(**kwargs: Any) -> StyleGuide:
isolated=prelim_opts.isolated,
)
application.find_plugins(cfg, cfg_dir, prelim_opts.enable_extensions)
application.find_plugins(
cfg,
cfg_dir,
enable_extensions=prelim_opts.enable_extensions,
require_plugins=prelim_opts.require_plugins,
)
application.register_plugin_options()
application.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
# We basically want application.initialize to be called but with these

View file

@ -111,14 +111,21 @@ class Application:
self,
cfg: configparser.RawConfigParser,
cfg_dir: str,
*,
enable_extensions: Optional[str],
require_plugins: Optional[str],
) -> None:
"""Find and load the plugins for this application.
Set :attr:`plugins` based on loaded plugins.
"""
opts = finder.parse_plugin_options(cfg, cfg_dir, enable_extensions)
raw = finder.find_plugins(cfg)
opts = finder.parse_plugin_options(
cfg,
cfg_dir,
enable_extensions=enable_extensions,
require_plugins=require_plugins,
)
raw = finder.find_plugins(cfg, opts)
self.plugins = finder.load_plugins(raw, opts)
def register_plugin_options(self) -> None:
@ -295,7 +302,12 @@ class Application:
isolated=prelim_opts.isolated,
)
self.find_plugins(cfg, cfg_dir, prelim_opts.enable_extensions)
self.find_plugins(
cfg,
cfg_dir,
enable_extensions=prelim_opts.enable_extensions,
require_plugins=prelim_opts.require_plugins,
)
self.register_plugin_options()
self.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
self.make_formatter()

View file

@ -68,6 +68,11 @@ def stage1_arg_parser() -> argparse.ArgumentParser:
"by default",
)
parser.add_argument(
"--require-plugins",
help="Require specific plugins to be installed before running",
)
return parser

View file

@ -15,6 +15,7 @@ from typing import Tuple
from flake8 import utils
from flake8._compat import importlib_metadata
from flake8.exceptions import ExecutionError
from flake8.exceptions import FailedToLoadPlugin
LOG = logging.getLogger(__name__)
@ -88,6 +89,65 @@ class Plugins(NamedTuple):
)
class PluginOptions(NamedTuple):
"""Options related to plugin loading."""
local_plugin_paths: Tuple[str, ...]
enable_extensions: FrozenSet[str]
require_plugins: FrozenSet[str]
@classmethod
def blank(cls) -> "PluginOptions":
"""Make a blank PluginOptions, mostly used for tests."""
return cls(
local_plugin_paths=(),
enable_extensions=frozenset(),
require_plugins=frozenset(),
)
def _parse_option(
cfg: configparser.RawConfigParser,
cfg_opt_name: str,
opt: Optional[str],
) -> List[str]:
# specified on commandline: use that
if opt is not None:
return utils.parse_comma_separated_list(opt)
else:
# ideally this would reuse our config parsing framework but we need to
# parse this from preliminary options before plugins are enabled
for opt_name in (cfg_opt_name, cfg_opt_name.replace("_", "-")):
val = cfg.get("flake8", opt_name, fallback=None)
if val is not None:
return utils.parse_comma_separated_list(val)
else:
return []
def parse_plugin_options(
cfg: configparser.RawConfigParser,
cfg_dir: str,
*,
enable_extensions: Optional[str],
require_plugins: Optional[str],
) -> PluginOptions:
"""Parse plugin loading related options."""
paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
paths = utils.parse_comma_separated_list(paths_s)
paths = utils.normalize_paths(paths, cfg_dir)
return PluginOptions(
local_plugin_paths=tuple(paths),
enable_extensions=frozenset(
_parse_option(cfg, "enable_extensions", enable_extensions),
),
require_plugins=frozenset(
_parse_option(cfg, "require_plugins", require_plugins),
),
)
def _flake8_plugins(
eps: Iterable[importlib_metadata.EntryPoint],
name: str,
@ -160,67 +220,40 @@ def _find_local_plugins(
yield Plugin("local", "local", ep)
def find_plugins(cfg: configparser.RawConfigParser) -> List[Plugin]:
def _check_required_plugins(
plugins: List[Plugin],
expected: FrozenSet[str],
) -> None:
plugin_names = {
utils.normalize_pypi_name(plugin.package) for plugin in plugins
}
expected_names = {utils.normalize_pypi_name(name) for name in expected}
missing_plugins = expected_names - plugin_names
if missing_plugins:
raise ExecutionError(
f"required plugins were not installed!\n"
f"- installed: {', '.join(sorted(plugin_names))}\n"
f"- expected: {', '.join(sorted(expected_names))}\n"
f"- missing: {', '.join(sorted(missing_plugins))}"
)
def find_plugins(
cfg: configparser.RawConfigParser,
opts: PluginOptions,
) -> List[Plugin]:
"""Discovers all plugins (but does not load them)."""
ret = [*_find_importlib_plugins(), *_find_local_plugins(cfg)]
# for determinism, sort the list
ret.sort()
_check_required_plugins(ret, opts.require_plugins)
return ret
class PluginOptions(NamedTuple):
"""Options related to plugin loading."""
local_plugin_paths: Tuple[str, ...]
enable_extensions: FrozenSet[str]
# TODO: more options here!
# require_plugins: Tuple[str, ...]
@classmethod
def blank(cls) -> "PluginOptions":
"""Make a blank PluginOptions, mostly used for tests."""
return cls(local_plugin_paths=(), enable_extensions=frozenset())
def _parse_option(
cfg: configparser.RawConfigParser,
cfg_opt_name: str,
opt: Optional[str],
) -> List[str]:
# specified on commandline: use that
if opt is not None:
return utils.parse_comma_separated_list(opt)
else:
# ideally this would reuse our config parsing framework but we need to
# parse this from preliminary options before plugins are enabled
for opt_name in (cfg_opt_name, cfg_opt_name.replace("_", "-")):
val = cfg.get("flake8", opt_name, fallback=None)
if val is not None:
return utils.parse_comma_separated_list(val)
else:
return []
def parse_plugin_options(
cfg: configparser.RawConfigParser,
cfg_dir: str,
enable_extensions: Optional[str],
) -> PluginOptions:
"""Parse plugin loading related options."""
paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
paths = utils.parse_comma_separated_list(paths_s)
paths = utils.normalize_paths(paths, cfg_dir)
return PluginOptions(
local_plugin_paths=tuple(paths),
enable_extensions=frozenset(
_parse_option(cfg, "enable_extensions", enable_extensions),
),
)
def _parameters_for(func: Any) -> Dict[str, bool]:
"""Return the parameters for the plugin.

View file

@ -25,6 +25,7 @@ from flake8 import exceptions
DIFF_HUNK_REGEXP = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$")
COMMA_SEPARATED_LIST_RE = re.compile(r"[,\s]")
LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]")
NORMALIZE_PACKAGE_NAME_RE = re.compile(r"[-_.]+")
def parse_comma_separated_list(
@ -343,3 +344,8 @@ def get_python_version() -> str:
platform.python_version(),
platform.system(),
)
def normalize_pypi_name(s: str) -> str:
"""Normalize a distribution name according to PEP 503."""
return NORMALIZE_PACKAGE_NAME_RE.sub("-", s).lower()