refactor and simplify configuration loading

This commit is contained in:
Anthony Sottile 2021-11-22 19:42:50 -05:00
parent dc9b7eb3e4
commit 65c893728e
20 changed files with 351 additions and 883 deletions

View file

@ -31,19 +31,16 @@ def get_style_guide(**kwargs):
application = app.Application()
prelim_opts, remaining_args = application.parse_preliminary_options([])
flake8.configure_logging(prelim_opts.verbose, prelim_opts.output_file)
config_finder = config.ConfigFileFinder(
application.program,
prelim_opts.append_config,
config_file=prelim_opts.config,
ignore_config_files=prelim_opts.isolated,
cfg, cfg_dir = config.load_config(
config=prelim_opts.config,
extra=prelim_opts.append_config,
isolated=prelim_opts.isolated,
)
application.find_plugins(config_finder)
application.find_plugins(cfg, cfg_dir)
application.register_plugin_options()
application.parse_configuration_and_cli(
config_finder,
remaining_args,
)
application.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
# We basically want application.initialize to be called but with these
# options set instead before we make our formatter, notifier, internal
# style guide and file checker manager.

View file

@ -1,5 +1,6 @@
"""Module containing the application logic for Flake8."""
import argparse
import configparser
import logging
import sys
import time
@ -130,24 +131,35 @@ class Application:
else:
return int((self.result_count > 0) or self.catastrophic_failure)
def find_plugins(self, config_finder: config.ConfigFileFinder) -> None:
def find_plugins(
self,
cfg: configparser.RawConfigParser,
cfg_dir: str,
) -> None:
"""Find and load the plugins for this application.
Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes
based on the discovered plugins found.
:param config.ConfigFileFinder config_finder:
The finder for finding and reading configuration files.
"""
local_plugins = config.get_local_plugins(config_finder)
sys.path.extend(local_plugins.paths)
self.check_plugins = plugin_manager.Checkers(local_plugins.extension)
self.formatting_plugins = plugin_manager.ReportFormatters(
local_plugins.report
# TODO: move to src/flake8/plugins/finder.py
extension_local = utils.parse_comma_separated_list(
cfg.get("flake8:local-plugins", "extension", fallback="").strip(),
regexp=utils.LOCAL_PLUGIN_LIST_RE,
)
report_local = utils.parse_comma_separated_list(
cfg.get("flake8:local-plugins", "report", fallback="").strip(),
regexp=utils.LOCAL_PLUGIN_LIST_RE,
)
paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
local_paths = utils.parse_comma_separated_list(paths_s)
local_paths = utils.normalize_paths(local_paths, cfg_dir)
sys.path.extend(local_paths)
self.check_plugins = plugin_manager.Checkers(extension_local)
self.formatting_plugins = plugin_manager.ReportFormatters(report_local)
self.check_plugins.load_plugins()
self.formatting_plugins.load_plugins()
@ -162,19 +174,15 @@ class Application:
def parse_configuration_and_cli(
self,
config_finder: config.ConfigFileFinder,
cfg: configparser.RawConfigParser,
cfg_dir: str,
argv: List[str],
) -> None:
"""Parse configuration files and the CLI options.
:param config.ConfigFileFinder config_finder:
The finder for finding and reading configuration files.
:param list argv:
Command-line arguments passed in directly.
"""
"""Parse configuration files and the CLI options."""
self.options = aggregator.aggregate_options(
self.option_manager,
config_finder,
cfg,
cfg_dir,
argv,
)
@ -329,18 +337,16 @@ class Application:
# our legacy API calls to these same methods.
prelim_opts, remaining_args = self.parse_preliminary_options(argv)
flake8.configure_logging(prelim_opts.verbose, prelim_opts.output_file)
config_finder = config.ConfigFileFinder(
self.program,
prelim_opts.append_config,
config_file=prelim_opts.config,
ignore_config_files=prelim_opts.isolated,
cfg, cfg_dir = config.load_config(
config=prelim_opts.config,
extra=prelim_opts.append_config,
isolated=prelim_opts.isolated,
)
self.find_plugins(config_finder)
self.find_plugins(cfg, cfg_dir)
self.register_plugin_options()
self.parse_configuration_and_cli(
config_finder,
remaining_args,
)
self.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
self.make_formatter()
self.make_guide()
self.make_file_checker_manager()

View file

@ -39,6 +39,7 @@ def register_preliminary_options(parser: argparse.ArgumentParser) -> None:
add_argument(
"--append-config",
action="append",
default=[],
help="Provide extra config files to parse in addition to the files "
"found by Flake8 by default. These files are the last ones read "
"and so they take the highest precedence when multiple files "

View file

@ -4,8 +4,10 @@ This holds the logic that uses the collected and merged config files and
applies the user-specified command-line configuration on top of it.
"""
import argparse
import configparser
import logging
from typing import List
from typing import Optional
from typing import Sequence
from flake8.options import config
from flake8.options.manager import OptionManager
@ -15,34 +17,16 @@ LOG = logging.getLogger(__name__)
def aggregate_options(
manager: OptionManager,
config_finder: config.ConfigFileFinder,
argv: List[str],
cfg: configparser.RawConfigParser,
cfg_dir: str,
argv: Optional[Sequence[str]],
) -> argparse.Namespace:
"""Aggregate and merge CLI and config file options.
:param flake8.options.manager.OptionManager manager:
The instance of the OptionManager that we're presently using.
:param flake8.options.config.ConfigFileFinder config_finder:
The config file finder to use.
:param list argv:
The list of remaining command-line arguments that were unknown during
preliminary option parsing to pass to ``manager.parse_args``.
:returns:
Tuple of the parsed options and extra arguments returned by
``manager.parse_args``.
:rtype:
tuple(argparse.Namespace, list)
"""
"""Aggregate and merge CLI and config file options."""
# Get defaults from the option parser
default_values = manager.parse_args([])
# Make our new configuration file mergerator
config_parser = config.ConfigParser(
option_manager=manager, config_finder=config_finder
)
# Get the parsed config
parsed_config = config_parser.parse()
parsed_config = config.parse_config(manager, cfg, cfg_dir)
# Extend the default ignore value with the extended default ignore list,
# registered by plugins.
@ -70,7 +54,9 @@ def aggregate_options(
# If the config name is somehow different from the destination name,
# fetch the destination name from our Option
if not hasattr(default_values, config_name):
dest_name = config_parser.config_options[config_name].dest
dest_val = manager.config_options_dict[config_name].dest
assert isinstance(dest_val, str)
dest_name = dest_val
LOG.debug(
'Overriding default value of (%s) for "%s" with (%s)',

View file

@ -1,318 +1,108 @@
"""Config handling logic for Flake8."""
import collections
import configparser
import logging
import os.path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from flake8 import utils
from flake8.options.manager import OptionManager
LOG = logging.getLogger(__name__)
__all__ = ("ConfigFileFinder", "ConfigParser")
class ConfigFileFinder:
"""Encapsulate the logic for finding and reading config files."""
def __init__(
self,
program_name: str,
extra_config_files: Optional[List[str]] = None,
config_file: Optional[str] = None,
ignore_config_files: bool = False,
) -> None:
"""Initialize object to find config files.
:param str program_name:
Name of the current program (e.g., flake8).
:param list extra_config_files:
Extra configuration files specified by the user to read.
:param str config_file:
Configuration file override to only read configuration from.
:param bool ignore_config_files:
Determine whether to ignore configuration files or not.
"""
# The values of --append-config from the CLI
if extra_config_files is None:
extra_config_files = []
self.extra_config_files = utils.normalize_paths(extra_config_files)
# The value of --config from the CLI.
self.config_file = config_file
# The value of --isolated from the CLI.
self.ignore_config_files = ignore_config_files
# User configuration file.
self.program_name = program_name
# List of filenames to find in the local/project directory
self.project_filenames = ("setup.cfg", "tox.ini", f".{program_name}")
self.local_directory = os.path.abspath(os.curdir)
@staticmethod
def _read_config(
*files: str,
) -> Tuple[configparser.RawConfigParser, List[str]]:
config = configparser.RawConfigParser()
found_files = []
for filename in files:
def _find_config_file(path: str) -> Optional[str]:
cfg = configparser.RawConfigParser()
while True:
for candidate in ("setup.cfg", "tox.ini", ".flake8"):
cfg_path = os.path.join(path, candidate)
try:
found_files.extend(config.read(filename))
except UnicodeDecodeError:
LOG.exception(
"There was an error decoding a config file."
"The file with a problem was %s.",
filename,
)
except configparser.ParsingError:
LOG.exception(
"There was an error trying to parse a config "
"file. The file with a problem was %s.",
filename,
)
return (config, found_files)
cfg.read(cfg_path)
except (UnicodeDecodeError, configparser.ParsingError) as e:
LOG.warning("ignoring unparseable config %s: %s", cfg_path, e)
else:
# only consider it a config if it contains flake8 sections
if "flake8" in cfg or "flake8:local-plugins" in cfg:
return cfg_path
def cli_config(self, files: str) -> configparser.RawConfigParser:
"""Read and parse the config file specified on the command-line."""
config, found_files = self._read_config(files)
if found_files:
LOG.debug("Found cli configuration files: %s", found_files)
return config
new_path = os.path.dirname(path)
if new_path == path:
break
else:
path = new_path
def generate_possible_local_files(self):
"""Find and generate all local config files."""
parent = tail = os.getcwd()
found_config_files = False
while tail and not found_config_files:
for project_filename in self.project_filenames:
filename = os.path.abspath(
os.path.join(parent, project_filename)
)
if os.path.exists(filename):
yield filename
found_config_files = True
self.local_directory = parent
(parent, tail) = os.path.split(parent)
def local_config_files(self):
"""Find all local config files which actually exist.
Filter results from
:meth:`~ConfigFileFinder.generate_possible_local_files` based
on whether the filename exists or not.
:returns:
List of files that exist that are local project config files with
extra config files appended to that list (which also exist).
:rtype:
[str]
"""
exists = os.path.exists
return [
filename for filename in self.generate_possible_local_files()
] + [f for f in self.extra_config_files if exists(f)]
def local_configs_with_files(self):
"""Parse all local config files into one config object.
Return (config, found_config_files) tuple.
"""
config, found_files = self._read_config(*self.local_config_files())
if found_files:
LOG.debug("Found local configuration files: %s", found_files)
return (config, found_files)
def local_configs(self):
"""Parse all local config files into one config object."""
return self.local_configs_with_files()[0]
# did not find any configuration file
return None
class ConfigParser:
"""Encapsulate merging different types of configuration files.
def load_config(
config: Optional[str],
extra: List[str],
*,
isolated: bool = False,
) -> Tuple[configparser.RawConfigParser, str]:
"""Load the configuration given the user options.
This parses out the options registered that were specified in the
configuration files, handles extra configuration files, and returns
dictionaries with the parsed values.
- in ``isolated`` mode, return an empty configuration
- if a config file is given in ``config`` use that, otherwise attempt to
discover a configuration using ``tox.ini`` / ``setup.cfg`` / ``.flake8``
- finally, load any ``extra`` configuration files
"""
pwd = os.path.abspath(".")
#: Set of actions that should use the
#: :meth:`~configparser.RawConfigParser.getbool` method.
GETBOOL_ACTIONS = {"store_true", "store_false"}
if isolated:
return configparser.RawConfigParser(), pwd
def __init__(self, option_manager, config_finder):
"""Initialize the ConfigParser instance.
if config is None:
config = _find_config_file(pwd)
:param flake8.options.manager.OptionManager option_manager:
Initialized OptionManager.
:param flake8.options.config.ConfigFileFinder config_finder:
Initialized ConfigFileFinder.
"""
#: Our instance of flake8.options.manager.OptionManager
self.option_manager = option_manager
#: The prog value for the cli parser
self.program_name = option_manager.program_name
#: Mapping of configuration option names to
#: :class:`~flake8.options.manager.Option` instances
self.config_options = option_manager.config_options_dict
#: Our instance of our :class:`~ConfigFileFinder`
self.config_finder = config_finder
def _normalize_value(self, option, value, parent=None):
if parent is None:
parent = self.config_finder.local_directory
final_value = option.normalize(value, parent)
LOG.debug(
'%r has been normalized to %r for option "%s"',
value,
final_value,
option.config_name,
)
return final_value
def _parse_config(self, config_parser, parent=None):
config_dict = {}
for option_name in config_parser.options(self.program_name):
if option_name not in self.config_options:
LOG.debug(
'Option "%s" is not registered. Ignoring.', option_name
)
continue
option = self.config_options[option_name]
# Use the appropriate method to parse the config value
method = config_parser.get
if option.type is int or option.action == "count":
method = config_parser.getint
elif option.action in self.GETBOOL_ACTIONS:
method = config_parser.getboolean
value = method(self.program_name, option_name)
LOG.debug('Option "%s" returned value: %r', option_name, value)
final_value = self._normalize_value(option, value, parent)
config_dict[option.config_name] = final_value
return config_dict
def is_configured_by(self, config):
"""Check if the specified config parser has an appropriate section."""
return config.has_section(self.program_name)
def parse_local_config(self):
"""Parse and return the local configuration files."""
config = self.config_finder.local_configs()
if not self.is_configured_by(config):
LOG.debug(
"Local configuration files have no %s section",
self.program_name,
)
return {}
LOG.debug("Parsing local configuration files.")
return self._parse_config(config)
def parse_cli_config(self, config_path):
"""Parse and return the file specified by --config."""
config = self.config_finder.cli_config(config_path)
if not self.is_configured_by(config):
LOG.debug(
"CLI configuration files have no %s section",
self.program_name,
)
return {}
LOG.debug("Parsing CLI configuration files.")
return self._parse_config(config, os.path.dirname(config_path))
def parse(self):
"""Parse and return the local config files.
:returns:
Dictionary of parsed configuration options
:rtype:
dict
"""
if self.config_finder.ignore_config_files:
LOG.debug(
"Refusing to parse configuration files due to user-"
"requested isolation"
)
return {}
if self.config_finder.config_file:
LOG.debug(
"Ignoring user and locally found configuration files. "
'Reading only configuration from "%s" specified via '
"--config by the user",
self.config_finder.config_file,
)
return self.parse_cli_config(self.config_finder.config_file)
return self.parse_local_config()
def get_local_plugins(config_finder):
"""Get local plugins lists from config files.
:param flake8.options.config.ConfigFileFinder config_finder:
The config file finder to use.
:returns:
LocalPlugins namedtuple containing two lists of plugin strings,
one for extension (checker) plugins and one for report plugins.
:rtype:
flake8.options.config.LocalPlugins
"""
local_plugins = LocalPlugins(extension=[], report=[], paths=[])
if config_finder.ignore_config_files:
LOG.debug(
"Refusing to look for local plugins in configuration"
"files due to user-requested isolation"
)
return local_plugins
if config_finder.config_file:
LOG.debug(
'Reading local plugins only from "%s" specified via '
"--config by the user",
config_finder.config_file,
)
config = config_finder.cli_config(config_finder.config_file)
config_files = [config_finder.config_file]
cfg = configparser.RawConfigParser()
if config is not None:
cfg.read(config)
cfg_dir = os.path.dirname(config)
else:
config, config_files = config_finder.local_configs_with_files()
cfg_dir = pwd
base_dirs = {os.path.dirname(cf) for cf in config_files}
# TODO: remove this and replace it with configuration modifying plugins
# read the additional configs afterwards
for filename in extra:
cfg.read(filename)
section = f"{config_finder.program_name}:local-plugins"
for plugin_type in ["extension", "report"]:
if config.has_option(section, plugin_type):
local_plugins_string = config.get(section, plugin_type).strip()
plugin_type_list = getattr(local_plugins, plugin_type)
plugin_type_list.extend(
utils.parse_comma_separated_list(
local_plugins_string, regexp=utils.LOCAL_PLUGIN_LIST_RE
)
)
if config.has_option(section, "paths"):
raw_paths = utils.parse_comma_separated_list(
config.get(section, "paths").strip()
)
norm_paths: List[str] = []
for base_dir in base_dirs:
norm_paths.extend(
path
for path in utils.normalize_paths(raw_paths, parent=base_dir)
if os.path.exists(path)
)
local_plugins.paths.extend(norm_paths)
return local_plugins
return cfg, cfg_dir
LocalPlugins = collections.namedtuple("LocalPlugins", "extension report paths")
def parse_config(
option_manager: OptionManager,
cfg: configparser.RawConfigParser,
cfg_dir: str,
) -> Dict[str, Any]:
"""Parse and normalize the typed configuration options."""
if "flake8" not in cfg:
return {}
config_dict = {}
for option_name in cfg["flake8"]:
option = option_manager.config_options_dict.get(option_name)
if option is None:
LOG.debug('Option "%s" is not registered. Ignoring.', option_name)
continue
# Use the appropriate method to parse the config value
value: Any
if option.type is int or option.action == "count":
value = cfg.getint("flake8", option_name)
elif option.action in {"store_true", "store_false"}:
value = cfg.getboolean("flake8", option_name)
else:
value = cfg.get("flake8", option_name)
LOG.debug('Option "%s" returned value: %r', option_name, value)
final_value = option.normalize(value, cfg_dir)
assert option.config_name is not None
config_dict[option.config_name] = final_value
return config_dict

View file

@ -462,7 +462,7 @@ class OptionManager:
def parse_args(
self,
args: Optional[List[str]] = None,
args: Optional[Sequence[str]] = None,
values: Optional[argparse.Namespace] = None,
) -> argparse.Namespace:
"""Proxy to calling the OptionParser's parse_args method."""