Add --required-plugins and --allowed-plugins

Closes #283
Closes #488
This commit is contained in:
Ian Stapleton Cordasco 2021-11-05 10:10:52 -05:00
parent 05cae7e046
commit 5694f6f3af
No known key found for this signature in database
GPG key ID: 656D3395E4A9791A
6 changed files with 172 additions and 7 deletions

View file

@ -38,7 +38,7 @@ def get_style_guide(**kwargs):
ignore_config_files=prelim_opts.isolated, ignore_config_files=prelim_opts.isolated,
) )
application.find_plugins(config_finder) application.find_plugins(config_finder, prelim_opts)
application.register_plugin_options() application.register_plugin_options()
application.parse_configuration_and_cli( application.parse_configuration_and_cli(
config_finder, config_finder,

View file

@ -1,5 +1,6 @@
"""Exception classes for all of Flake8.""" """Exception classes for all of Flake8."""
from typing import Dict from typing import Dict
from typing import List
class Flake8Exception(Exception): class Flake8Exception(Exception):
@ -69,3 +70,24 @@ class PluginExecutionFailed(Flake8Exception):
"name": self.plugin["plugin_name"], "name": self.plugin["plugin_name"],
"exc": self.original_exception, "exc": self.original_exception,
} }
class PluginMissingError(Flake8Exception):
"""A plugin that was required was not found."""
FORMAT = "User required %(plugins)s but %(missing)s was not found."
def __init__(
self, required_plugins: List[str], missing_plugins: List[str]
) -> None:
"""Store the information passed in to format the exception message."""
self.required_plugins = required_plugins
self.missing_plugins = missing_plugins
super().__init__(required_plugins, missing_plugins)
def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT % {
"plugins": ", ".join(self.required_plugins),
"missing": ", ".join(self.missing_plugins),
}

View file

@ -141,7 +141,11 @@ class Application:
(self.result_count > 0) or self.catastrophic_failure (self.result_count > 0) or self.catastrophic_failure
) )
def find_plugins(self, config_finder: config.ConfigFileFinder) -> None: def find_plugins(
self,
config_finder: config.ConfigFileFinder,
prelim_opts: argparse.Namespace,
) -> None:
"""Find and load the plugins for this application. """Find and load the plugins for this application.
Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes
@ -149,8 +153,16 @@ class Application:
:param config.ConfigFileFinder config_finder: :param config.ConfigFileFinder config_finder:
The finder for finding and reading configuration files. The finder for finding and reading configuration files.
:param argparse.Namespace prelim_opts:
The options parsed preliminarily from the CLI
""" """
local_plugins = config.get_local_plugins(config_finder) local_plugins = config.get_local_plugins(config_finder)
(
allowed_plugins,
required_plugins,
) = config.get_plugin_allowlist_and_requirements(
config_finder, prelim_opts
)
sys.path.extend(local_plugins.paths) sys.path.extend(local_plugins.paths)
@ -160,7 +172,11 @@ class Application:
local_plugins.report local_plugins.report
) )
self.check_plugins.load_plugins() try:
self.check_plugins.load_plugins(allowed_plugins, required_plugins)
except exceptions.PluginMissingError as e:
print(f"Error: {e!s}")
raise SystemExit(2)
self.formatting_plugins.load_plugins() self.formatting_plugins.load_plugins()
def register_plugin_options(self) -> None: def register_plugin_options(self) -> None:
@ -340,7 +356,7 @@ class Application:
config_file=prelim_opts.config, config_file=prelim_opts.config,
ignore_config_files=prelim_opts.isolated, ignore_config_files=prelim_opts.isolated,
) )
self.find_plugins(config_finder) self.find_plugins(config_finder, prelim_opts)
self.register_plugin_options() self.register_plugin_options()
self.parse_configuration_and_cli( self.parse_configuration_and_cli(
config_finder, config_finder,

View file

@ -59,6 +59,23 @@ def register_preliminary_options(parser: argparse.ArgumentParser) -> None:
help="Ignore all configuration files.", help="Ignore all configuration files.",
) )
add_argument(
"--allowed-plugins",
default=None,
# parse_from_config=True,
# comma_separated_list=True,
help="Which plugins are allowed to run from the environment",
)
add_argument(
"--required-plugins",
default=None,
# parse_from_config=True,
# comma_separated_list=True,
help="Which plugins are required for linting. Exits if not all are "
"present.",
)
class JobsArgument: class JobsArgument:
"""Type callback for the --jobs argument.""" """Type callback for the --jobs argument."""

View file

@ -1,4 +1,5 @@
"""Config handling logic for Flake8.""" """Config handling logic for Flake8."""
import argparse
import collections import collections
import configparser import configparser
import logging import logging
@ -273,7 +274,7 @@ def get_local_plugins(config_finder):
if config_finder.ignore_config_files: if config_finder.ignore_config_files:
LOG.debug( LOG.debug(
"Refusing to look for local plugins in configuration" "Refusing to look for local plugins in configuration"
"files due to user-requested isolation" " files due to user-requested isolation"
) )
return local_plugins return local_plugins
@ -315,4 +316,87 @@ def get_local_plugins(config_finder):
return local_plugins return local_plugins
def get_plugin_allowlist_and_requirements(
config_finder: ConfigFileFinder, preliminary_opts: argparse.Namespace
) -> Tuple[List[str], List[str]]:
"""Get allowed and required plugin lists from config.
:param config_finder:
The config file finder to use.
:type config_finder:
:class:`~flake8.options.config.ConfigFileFinder`
:param preliminary_opts:
The config file finder to use.
:type preliminary_opts:
:class:`~argparse.Namespace`
:returns:
tuple of the allowed and required plugin lists
"""
read_allowed_plugins_from_config = True
read_required_plugins_from_config = True
allowed_plugins: List[str] = []
required_plugins: List[str] = []
return_tuple: Tuple[List[str], List[str]] = (
allowed_plugins,
required_plugins,
)
if preliminary_opts.allowed_plugins is not None:
allowed_plugins.extend(
utils.parse_comma_separated_list(preliminary_opts.allowed_plugins)
)
read_allowed_plugins_from_config = False
if preliminary_opts.required_plugins is not None:
required_plugins.extend(
utils.parse_comma_separated_list(preliminary_opts.required_plugins)
)
read_required_plugins_from_config = False
if config_finder.ignore_config_files:
LOG.debug(
"Refusing to look for plugin configuration in configuration"
" files due to user-requested isolation"
)
return return_tuple
if (
not read_allowed_plugins_from_config
and not read_required_plugins_from_config
):
LOG.debug("Found --allowed-plugins and --required-plugins")
return return_tuple
if config_finder.config_file:
LOG.debug(
'Reading local plugins only from "%s" specified via '
"--config by the user",
config_finder.config_file,
)
config = config_finder.cli_config(config_finder.config_file)
config_files = [config_finder.config_file]
else:
config, config_files = config_finder.local_configs_with_files()
section = f"{config_finder.program_name}"
if (
config.has_option(section, "allowed-plugins")
and read_allowed_plugins_from_config
):
allowed_plugins_str = config.get(section, "allowed-plugins").strip()
allowed_plugins.extend(
utils.parse_comma_separated_list(allowed_plugins_str)
)
if (
config.has_option(section, "required-plugins")
and read_required_plugins_from_config
):
required_plugins_str = config.get(section, "required-plugins").strip()
required_plugins.extend(
utils.parse_comma_separated_list(required_plugins_str)
)
return return_tuple
LocalPlugins = collections.namedtuple("LocalPlugins", "extension report paths") LocalPlugins = collections.namedtuple("LocalPlugins", "extension report paths")

View file

@ -1,6 +1,7 @@
"""Plugin loading and management logic and classes.""" """Plugin loading and management logic and classes."""
import logging import logging
from typing import Any from typing import Any
from typing import cast
from typing import Dict from typing import Dict
from typing import List from typing import List
from typing import Optional from typing import Optional
@ -410,13 +411,38 @@ class PluginTypeManager:
return generated_function return generated_function
def load_plugins(self): def load_plugins(
"""Load all plugins of this type that are managed by this manager.""" self,
allowed_plugins: Optional[List[str]] = None,
required_plugins: Optional[List[str]] = None,
) -> None:
"""Load all plugins of this type that are managed by this manager.
:param allowed_plugins:
This is the list of plugins allowed to be loaded
:param required_plugins:
This is the list of plugins required by the user
:raises PluginMissingError:
If a required plugin is missing
"""
if self.plugins_loaded: if self.plugins_loaded:
return return
requireset = set(required_plugins or [])
allowset = set(allowed_plugins or [])
loaded = set()
for plugin in self.plugins.values(): for plugin in self.plugins.values():
if allowset and plugin.name not in allowset:
continue
plugin.load_plugin() plugin.load_plugin()
loaded.add(plugin.name)
missing = requireset.difference(loaded)
if requireset and missing:
required_plugins = cast(List[str], required_plugins)
raise exceptions.PluginMissingError(
required_plugins, sorted(missing)
)
# Do not set plugins_loaded if we run into an exception # Do not set plugins_loaded if we run into an exception
self.plugins_loaded = True self.plugins_loaded = True