have OptionManager take plugin versions directly

This commit is contained in:
Anthony Sottile 2021-12-08 15:49:17 -05:00
parent fed77cd60a
commit f98d52a398
9 changed files with 73 additions and 255 deletions

View file

@ -113,8 +113,6 @@ disallow_untyped_defs = false
disallow_untyped_defs = false disallow_untyped_defs = false
[mypy-flake8.main.application] [mypy-flake8.main.application]
disallow_untyped_defs = false disallow_untyped_defs = false
[mypy-flake8.main.debug]
disallow_untyped_defs = false
[mypy-flake8.plugins.manager] [mypy-flake8.plugins.manager]
disallow_untyped_defs = false disallow_untyped_defs = false

View file

@ -37,7 +37,7 @@ LOG = logging.getLogger(__name__)
class Application: class Application:
"""Abstract our application into a class.""" """Abstract our application into a class."""
def __init__(self, program="flake8", version=flake8.__version__): def __init__(self) -> None:
"""Initialize our application. """Initialize our application.
:param str program: :param str program:
@ -49,21 +49,12 @@ class Application:
self.start_time = time.time() self.start_time = time.time()
#: The timestamp when the Application finished reported errors. #: The timestamp when the Application finished reported errors.
self.end_time: Optional[float] = None self.end_time: Optional[float] = None
#: The name of the program being run
self.program = program
#: The version of the program being run
self.version = version
#: The prelimary argument parser for handling options required for #: The prelimary argument parser for handling options required for
#: obtaining and parsing the configuration file. #: obtaining and parsing the configuration file.
self.prelim_arg_parser = options.stage1_arg_parser() self.prelim_arg_parser = options.stage1_arg_parser()
#: The instance of :class:`flake8.options.manager.OptionManager` used #: The instance of :class:`flake8.options.manager.OptionManager` used
#: to parse and handle the options and arguments passed by the user #: to parse and handle the options and arguments passed by the user
self.option_manager = manager.OptionManager( self.option_manager: Optional[manager.OptionManager] = None
prog="flake8",
version=flake8.__version__,
parents=[self.prelim_arg_parser],
)
options.register_default_options(self.option_manager)
#: The instance of :class:`flake8.plugins.manager.Checkers` #: The instance of :class:`flake8.plugins.manager.Checkers`
self.check_plugins: Optional[plugin_manager.Checkers] = None self.check_plugins: Optional[plugin_manager.Checkers] = None
@ -166,9 +157,19 @@ class Application:
def register_plugin_options(self) -> None: def register_plugin_options(self) -> None:
"""Register options provided by plugins to our option manager.""" """Register options provided by plugins to our option manager."""
assert self.check_plugins is not None assert self.check_plugins is not None
self.check_plugins.register_options(self.option_manager)
self.check_plugins.register_plugin_versions(self.option_manager)
assert self.formatting_plugins is not None assert self.formatting_plugins is not None
versions = sorted(set(self.check_plugins.manager.versions()))
self.option_manager = manager.OptionManager(
version=flake8.__version__,
plugin_versions=", ".join(
f"{name}: {version}" for name, version in versions
),
parents=[self.prelim_arg_parser],
)
options.register_default_options(self.option_manager)
self.check_plugins.register_options(self.option_manager)
self.formatting_plugins.register_options(self.option_manager) self.formatting_plugins.register_options(self.option_manager)
def parse_configuration_and_cli( def parse_configuration_and_cli(
@ -178,6 +179,7 @@ class Application:
argv: List[str], argv: List[str],
) -> None: ) -> None:
"""Parse configuration files and the CLI options.""" """Parse configuration files and the CLI options."""
assert self.option_manager is not None
self.options = aggregator.aggregate_options( self.options = aggregator.aggregate_options(
self.option_manager, self.option_manager,
cfg, cfg,
@ -186,7 +188,8 @@ class Application:
) )
if self.options.bug_report: if self.options.bug_report:
info = debug.information(self.option_manager) assert self.check_plugins is not None
info = debug.information(flake8.__version__, self.check_plugins)
print(json.dumps(info, indent=2, sort_keys=True)) print(json.dumps(info, indent=2, sort_keys=True))
raise SystemExit(0) raise SystemExit(0)

View file

@ -1,12 +1,20 @@
"""Module containing the logic for our debugging logic.""" """Module containing the logic for our debugging logic."""
import platform import platform
from typing import Any
from typing import Dict
from typing import List
from flake8.plugins.manager import PluginTypeManager
def information(option_manager): def information(
version: str,
plugins: PluginTypeManager,
) -> Dict[str, Any]:
"""Generate the information to be printed for the bug report.""" """Generate the information to be printed for the bug report."""
return { return {
"version": option_manager.version, "version": version,
"plugins": plugins_from(option_manager), "plugins": plugins_from(plugins),
"platform": { "platform": {
"python_implementation": platform.python_implementation(), "python_implementation": platform.python_implementation(),
"python_version": platform.python_version(), "python_version": platform.python_version(),
@ -15,13 +23,9 @@ def information(option_manager):
} }
def plugins_from(option_manager): def plugins_from(plugins: PluginTypeManager) -> List[Dict[str, str]]:
"""Generate the list of plugins installed.""" """Generate the list of plugins installed."""
return [ return [
{ {"plugin": name, "version": version}
"plugin": plugin.name, for name, version in sorted(set(plugins.manager.versions()))
"version": plugin.version,
"is_local": plugin.local,
}
for plugin in sorted(option_manager.registered_plugins)
] ]

View file

@ -1,13 +1,11 @@
"""Option handling and Option management logic.""" """Option handling and Option management logic."""
import argparse import argparse
import collections
import contextlib import contextlib
import enum import enum
import functools import functools
import logging import logging
from typing import Any from typing import Any
from typing import Callable from typing import Callable
from typing import cast
from typing import Dict from typing import Dict
from typing import Generator from typing import Generator
from typing import List from typing import List
@ -316,20 +314,15 @@ class Option:
return self.option_args, self.filtered_option_kwargs return self.option_args, self.filtered_option_kwargs
PluginVersion = collections.namedtuple(
"PluginVersion", ["name", "version", "local"]
)
class OptionManager: class OptionManager:
"""Manage Options and OptionParser while adding post-processing.""" """Manage Options and OptionParser while adding post-processing."""
def __init__( def __init__(
self, self,
prog: str, *,
version: str, version: str,
usage: str = "%(prog)s [options] file file ...", plugin_versions: str,
parents: Optional[List[argparse.ArgumentParser]] = None, parents: List[argparse.ArgumentParser],
) -> None: ) -> None:
"""Initialize an instance of an OptionManager. """Initialize an instance of an OptionManager.
@ -343,28 +336,28 @@ class OptionManager:
A list of ArgumentParser objects whose arguments should also be A list of ArgumentParser objects whose arguments should also be
included. included.
""" """
if parents is None: self.parser = argparse.ArgumentParser(
parents = [] prog="flake8",
usage="%(prog)s [options] file file ...",
self.parser: argparse.ArgumentParser = argparse.ArgumentParser( parents=parents,
prog=prog, usage=usage, parents=parents epilog=f"Installed plugins: {plugin_versions}",
) )
self._current_group: Optional[argparse._ArgumentGroup] = None self.parser.add_argument(
self.version_action = cast( "--version",
"argparse._VersionAction", action="version",
self.parser.add_argument( version=(
"--version", action="version", version=version f"{version} ({plugin_versions}) "
f"{utils.get_python_version()}"
), ),
) )
self.parser.add_argument("filenames", nargs="*", metavar="filename") self.parser.add_argument("filenames", nargs="*", metavar="filename")
self.config_options_dict: Dict[str, Option] = {} self.config_options_dict: Dict[str, Option] = {}
self.options: List[Option] = [] self.options: List[Option] = []
self.program_name = prog
self.version = version
self.registered_plugins: Set[PluginVersion] = set()
self.extended_default_ignore: Set[str] = set() self.extended_default_ignore: Set[str] = set()
self.extended_default_select: Set[str] = set() self.extended_default_select: Set[str] = set()
self._current_group: Optional[argparse._ArgumentGroup] = None
@contextlib.contextmanager @contextlib.contextmanager
def group(self, name: str) -> Generator[None, None, None]: def group(self, name: str) -> Generator[None, None, None]:
"""Attach options to an argparse group during this context.""" """Attach options to an argparse group during this context."""
@ -395,7 +388,7 @@ class OptionManager:
self.options.append(option) self.options.append(option)
if option.parse_from_config: if option.parse_from_config:
name = option.config_name name = option.config_name
assert name is not None # nosec (for mypy) assert name is not None
self.config_options_dict[name] = option self.config_options_dict[name] = option
self.config_options_dict[name.replace("_", "-")] = option self.config_options_dict[name.replace("_", "-")] = option
LOG.debug('Registered option "%s".', option) LOG.debug('Registered option "%s".', option)
@ -438,63 +431,12 @@ class OptionManager:
LOG.debug("Extending default select list with %r", error_codes) LOG.debug("Extending default select list with %r", error_codes)
self.extended_default_select.update(error_codes) self.extended_default_select.update(error_codes)
def generate_versions(
self, format_str: str = "%(name)s: %(version)s", join_on: str = ", "
) -> str:
"""Generate a comma-separated list of versions of plugins."""
return join_on.join(
format_str % plugin._asdict()
for plugin in sorted(self.registered_plugins)
)
def update_version_string(self) -> None:
"""Update the flake8 version string."""
self.version_action.version = "{} ({}) {}".format(
self.version, self.generate_versions(), utils.get_python_version()
)
def generate_epilog(self) -> None:
"""Create an epilog with the version and name of each of plugin."""
plugin_version_format = "%(name)s: %(version)s"
self.parser.epilog = "Installed plugins: " + self.generate_versions(
plugin_version_format
)
def parse_args( def parse_args(
self, self,
args: Optional[Sequence[str]] = None, args: Optional[Sequence[str]] = None,
values: Optional[argparse.Namespace] = None, values: Optional[argparse.Namespace] = None,
) -> argparse.Namespace: ) -> argparse.Namespace:
"""Proxy to calling the OptionParser's parse_args method.""" """Proxy to calling the OptionParser's parse_args method."""
self.generate_epilog()
self.update_version_string()
if values: if values:
self.parser.set_defaults(**vars(values)) self.parser.set_defaults(**vars(values))
return self.parser.parse_args(args) return self.parser.parse_args(args)
def parse_known_args(
self, args: Optional[List[str]] = None
) -> Tuple[argparse.Namespace, List[str]]:
"""Parse only the known arguments from the argument values.
Replicate a little argparse behaviour while we're still on
optparse.
"""
self.generate_epilog()
self.update_version_string()
return self.parser.parse_known_args(args)
def register_plugin(
self, name: str, version: str, local: bool = False
) -> None:
"""Register a plugin relying on the OptionManager.
:param str name:
The name of the checker itself. This will be the ``name``
attribute of the class or function loaded from the entry-point.
:param str version:
The version of the checker that we're using.
:param bool local:
Whether the plugin is local to the project/repository or not.
"""
self.registered_plugins.add(PluginVersion(name, version, local))

View file

@ -427,12 +427,6 @@ class PluginTypeManager:
# Do not set plugins_loaded if we run into an exception # Do not set plugins_loaded if we run into an exception
self.plugins_loaded = True self.plugins_loaded = True
def register_plugin_versions(self, optmanager):
"""Register the plugins and their versions with the OptionManager."""
self.load_plugins()
for (plugin_name, version) in self.manager.versions():
optmanager.register_plugin(name=plugin_name, version=version)
def register_options(self, optmanager): def register_options(self, optmanager):
"""Register all of the checkers' options to the OptionManager.""" """Register all of the checkers' options to the OptionManager."""
self.load_plugins() self.load_plugins()

View file

@ -13,8 +13,9 @@ from flake8.options import manager
def optmanager(): def optmanager():
"""Create a new OptionManager.""" """Create a new OptionManager."""
option_manager = manager.OptionManager( option_manager = manager.OptionManager(
prog="flake8",
version="3.0.0", version="3.0.0",
plugin_versions="",
parents=[],
) )
options.register_default_options(option_manager) options.register_default_options(option_manager)
return option_manager return option_manager

View file

@ -4,58 +4,24 @@ from unittest import mock
import pytest import pytest
from flake8.main import debug from flake8.main import debug
from flake8.options import manager
@pytest.mark.parametrize( @pytest.mark.parametrize(
"plugins, expected", ("versions", "expected"),
[ (
([], []), ([], []),
( (
[manager.PluginVersion("pycodestyle", "2.0.0", False)], [("p1", "1"), ("p2", "2"), ("p1", "1")],
[ [
{ {"plugin": "p1", "version": "1"},
"plugin": "pycodestyle", {"plugin": "p2", "version": "2"},
"version": "2.0.0",
"is_local": False,
}
], ],
), ),
( ),
[
manager.PluginVersion("pycodestyle", "2.0.0", False),
manager.PluginVersion("mccabe", "0.5.9", False),
],
[
{"plugin": "mccabe", "version": "0.5.9", "is_local": False},
{
"plugin": "pycodestyle",
"version": "2.0.0",
"is_local": False,
},
],
),
(
[
manager.PluginVersion("pycodestyle", "2.0.0", False),
manager.PluginVersion("my-local", "0.0.1", True),
manager.PluginVersion("mccabe", "0.5.9", False),
],
[
{"plugin": "mccabe", "version": "0.5.9", "is_local": False},
{"plugin": "my-local", "version": "0.0.1", "is_local": True},
{
"plugin": "pycodestyle",
"version": "2.0.0",
"is_local": False,
},
],
),
],
) )
def test_plugins_from(plugins, expected): def test_plugins_from(versions, expected):
"""Test that we format plugins appropriately.""" """Test that we format plugins appropriately."""
option_manager = mock.Mock(registered_plugins=set(plugins)) option_manager = mock.Mock(**{"manager.versions.return_value": versions})
assert expected == debug.plugins_from(option_manager) assert expected == debug.plugins_from(option_manager)
@ -67,8 +33,8 @@ def test_information(system, pyversion, pyimpl):
expected = { expected = {
"version": "3.1.0", "version": "3.1.0",
"plugins": [ "plugins": [
{"plugin": "mccabe", "version": "0.5.9", "is_local": False}, {"plugin": "mccabe", "version": "0.5.9"},
{"plugin": "pycodestyle", "version": "2.0.0", "is_local": False}, {"plugin": "pycodestyle", "version": "2.0.0"},
], ],
"platform": { "platform": {
"python_implementation": "CPython", "python_implementation": "CPython",
@ -76,14 +42,15 @@ def test_information(system, pyversion, pyimpl):
"system": "Linux", "system": "Linux",
}, },
} }
option_manager = mock.Mock( plugins = mock.Mock(
registered_plugins={ **{
manager.PluginVersion("pycodestyle", "2.0.0", False), "manager.versions.return_value": [
manager.PluginVersion("mccabe", "0.5.9", False), ("pycodestyle", "2.0.0"),
}, ("mccabe", "0.5.9"),
version="3.1.0", ]
}
) )
assert expected == debug.information(option_manager) assert expected == debug.information("3.1.0", plugins)
pyimpl.assert_called_once_with() pyimpl.assert_called_once_with()
pyversion.assert_called_once_with() pyversion.assert_called_once_with()
system.assert_called_once_with() system.assert_called_once_with()

View file

@ -5,7 +5,6 @@ from unittest import mock
import pytest import pytest
from flake8 import utils
from flake8.main.options import JobsArgument from flake8.main.options import JobsArgument
from flake8.options import manager from flake8.options import manager
@ -15,7 +14,9 @@ TEST_VERSION = "3.0.0b1"
@pytest.fixture @pytest.fixture
def optmanager(): def optmanager():
"""Generate a simple OptionManager with default test arguments.""" """Generate a simple OptionManager with default test arguments."""
return manager.OptionManager(prog="flake8", version=TEST_VERSION) return manager.OptionManager(
version=TEST_VERSION, plugin_versions="", parents=[]
)
def test_option_manager_creates_option_parser(optmanager): def test_option_manager_creates_option_parser(optmanager):
@ -31,7 +32,7 @@ def test_option_manager_including_parent_options():
# WHEN # WHEN
optmanager = manager.OptionManager( optmanager = manager.OptionManager(
prog="flake8", version=TEST_VERSION, parents=[parent_parser] version=TEST_VERSION, plugin_versions="", parents=[parent_parser]
) )
options = optmanager.parse_args(["--parent", "foo"]) options = optmanager.parse_args(["--parent", "foo"])
@ -153,90 +154,6 @@ def test_parse_args_normalize_paths(optmanager):
] ]
def test_generate_versions(optmanager):
"""Verify a comma-separated string is generated of registered plugins."""
optmanager.registered_plugins = [
manager.PluginVersion("Testing 100", "0.0.0", False),
manager.PluginVersion("Testing 101", "0.0.0", False),
manager.PluginVersion("Testing 300", "0.0.0", True),
]
assert (
optmanager.generate_versions()
== "Testing 100: 0.0.0, Testing 101: 0.0.0, Testing 300: 0.0.0"
)
def test_plugins_are_sorted_in_generate_versions(optmanager):
"""Verify we sort before joining strings in generate_versions."""
optmanager.registered_plugins = [
manager.PluginVersion("pyflakes", "1.5.0", False),
manager.PluginVersion("mccabe", "0.7.0", False),
manager.PluginVersion("pycodestyle", "2.2.0", False),
manager.PluginVersion("flake8-docstrings", "0.6.1", False),
manager.PluginVersion("flake8-bugbear", "2016.12.1", False),
]
assert (
optmanager.generate_versions() == "flake8-bugbear: 2016.12.1, "
"flake8-docstrings: 0.6.1, "
"mccabe: 0.7.0, "
"pycodestyle: 2.2.0, "
"pyflakes: 1.5.0"
)
def test_generate_versions_with_format_string(optmanager):
"""Verify a comma-separated string is generated of registered plugins."""
optmanager.registered_plugins.update(
[
manager.PluginVersion("Testing", "0.0.0", False),
manager.PluginVersion("Testing", "0.0.0", False),
manager.PluginVersion("Testing", "0.0.0", False),
]
)
assert optmanager.generate_versions() == "Testing: 0.0.0"
def test_update_version_string(optmanager):
"""Verify we update the version string idempotently."""
assert optmanager.version == TEST_VERSION
assert optmanager.version_action.version == TEST_VERSION
optmanager.registered_plugins = [
manager.PluginVersion("Testing 100", "0.0.0", False),
manager.PluginVersion("Testing 101", "0.0.0", False),
manager.PluginVersion("Testing 300", "0.0.0", False),
]
optmanager.update_version_string()
assert optmanager.version == TEST_VERSION
assert (
optmanager.version_action.version
== TEST_VERSION
+ " (Testing 100: 0.0.0, Testing 101: 0.0.0, Testing 300: 0.0.0) "
+ utils.get_python_version()
)
def test_generate_epilog(optmanager):
"""Verify how we generate the epilog for help text."""
assert optmanager.parser.epilog is None
optmanager.registered_plugins = [
manager.PluginVersion("Testing 100", "0.0.0", False),
manager.PluginVersion("Testing 101", "0.0.0", False),
manager.PluginVersion("Testing 300", "0.0.0", False),
]
expected_value = (
"Installed plugins: Testing 100: 0.0.0, Testing 101: 0.0.0, Testing"
" 300: 0.0.0"
)
optmanager.generate_epilog()
assert optmanager.parser.epilog == expected_value
def test_extend_default_ignore(optmanager): def test_extend_default_ignore(optmanager):
"""Verify that we update the extended default ignore list.""" """Verify that we update the extended default ignore list."""
assert optmanager.extended_default_ignore == set() assert optmanager.extended_default_ignore == set()
@ -245,14 +162,6 @@ def test_extend_default_ignore(optmanager):
assert optmanager.extended_default_ignore == {"T100", "T101", "T102"} assert optmanager.extended_default_ignore == {"T100", "T101", "T102"}
def test_parse_known_args(optmanager):
"""Verify we ignore unknown options."""
with mock.patch("sys.exit") as sysexit:
optmanager.parse_known_args(["--max-complexity", "5"])
assert sysexit.called is False
def test_optparse_normalize_callback_option_legacy(optmanager): def test_optparse_normalize_callback_option_legacy(optmanager):
"""Test the optparse shim for `callback=`.""" """Test the optparse shim for `callback=`."""
callback_foo = mock.Mock() callback_foo = mock.Mock()

View file

@ -119,7 +119,7 @@ def test_load_config_append_config(tmpdir):
@pytest.fixture @pytest.fixture
def opt_manager(): def opt_manager():
ret = OptionManager(prog="flake8", version="123") ret = OptionManager(version="123", plugin_versions="", parents=[])
register_default_options(ret) register_default_options(ret)
return ret return ret