Merge branch 'main' into importlib-metadata-5-py37

This commit is contained in:
dill0wn 2022-11-18 11:18:06 -05:00 committed by GitHub
commit f1c7487043
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
89 changed files with 693 additions and 1434 deletions

2
.github/FUNDING.yml vendored Normal file
View file

@ -0,0 +1,2 @@
github: asottile
tidelift: pypi/pre-commit

5
.github/SECURITY.md vendored Normal file
View file

@ -0,0 +1,5 @@
## security contact information
to report a security vulnerability, please use the
[Tidelift security contact](https://tidelift.com/security).
Tidelift will coordinate the fix and disclosure.

View file

@ -15,9 +15,6 @@ jobs:
- os: ubuntu-latest
python: pypy-3.7
toxenv: py
- os: ubuntu-latest
python: 3.6
toxenv: py
- os: ubuntu-latest
python: 3.7
toxenv: py
@ -32,7 +29,7 @@ jobs:
toxenv: py
# windows
- os: windows-latest
python: 3.6
python: 3.7
toxenv: py
# misc
- os: ubuntu-latest

View file

@ -8,26 +8,30 @@ repos:
- id: trailing-whitespace
exclude: ^tests/fixtures/
- repo: https://github.com/asottile/reorder_python_imports
rev: v3.8.2
rev: v3.9.0
hooks:
- id: reorder-python-imports
args: [--application-directories, '.:src', --py36-plus]
args: [
--application-directories, '.:src',
--py37-plus,
--add-import, 'from __future__ import annotations',
]
- repo: https://github.com/asottile/pyupgrade
rev: v2.37.3
rev: v3.2.2
hooks:
- id: pyupgrade
args: [--py36-plus]
args: [--py37-plus]
- repo: https://github.com/psf/black
rev: 22.6.0
rev: 22.10.0
hooks:
- id: black
args: [--line-length=79]
- repo: https://github.com/PyCQA/flake8
rev: 5.0.3
rev: 5.0.4
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.971
rev: v0.990
hooks:
- id: mypy
exclude: ^(docs/|example-plugin/)

View file

@ -1,11 +1,12 @@
#!/usr/bin/env python3
from __future__ import annotations
import inspect
import os.path
from typing import Any
from typing import Callable
from typing import Generator
from typing import NamedTuple
from typing import Tuple
import pycodestyle
@ -20,7 +21,7 @@ def _too_long(s: str) -> str:
class Call(NamedTuple):
name: str
is_generator: bool
params: Tuple[str, ...]
params: tuple[str, ...]
def to_src(self) -> str:
params_s = ", ".join(self.params)
@ -35,7 +36,7 @@ class Call(NamedTuple):
return "\n".join(lines)
@classmethod
def from_func(cls, func: Callable[..., Any]) -> "Call":
def from_func(cls, func: Callable[..., Any]) -> Call:
spec = inspect.getfullargspec(func)
params = tuple(spec.args)
return cls(func.__name__, inspect.isgeneratorfunction(func), params)
@ -55,9 +56,10 @@ def lines() -> Generator[str, None, None]:
yield f'"""Generated using ./bin/{os.path.basename(__file__)}."""'
yield "# fmt: off"
yield "from __future__ import annotations"
yield ""
yield "from typing import Any"
yield "from typing import Generator"
yield "from typing import Tuple"
yield ""
imports = sorted(call.name for call in logical + physical)
for name in imports:
@ -69,7 +71,7 @@ def lines() -> Generator[str, None, None]:
logical_params = {param for call in logical for param in call.params}
for param in sorted(logical_params):
yield f" {param}: Any,"
yield ") -> Generator[Tuple[int, str], None, None]:"
yield ") -> Generator[tuple[int, str], None, None]:"
yield ' """Run pycodestyle logical checks."""'
for call in sorted(logical):
yield call.to_src()
@ -80,7 +82,7 @@ def lines() -> Generator[str, None, None]:
physical_params = {param for call in physical for param in call.params}
for param in sorted(physical_params):
yield f" {param}: Any,"
yield ") -> Generator[Tuple[int, str], None, None]:"
yield ") -> Generator[tuple[int, str], None, None]:"
yield ' """Run pycodestyle physical checks."""'
for call in sorted(physical):
yield call.to_src()

View file

@ -14,6 +14,8 @@
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
# sys.path.insert(0, os.path.abspath('.'))
from __future__ import annotations
import flake8
# -- General configuration ------------------------------------------------

View file

@ -60,11 +60,11 @@ If you only want to see the instances of a specific warning or error, you can
flake8 --select E123,W503 path/to/code/
Alternatively, if you want to *ignore* only one specific warning or error:
Alternatively, if you want to add a specific warning or error to *ignore*:
.. code::
flake8 --ignore E24,W504 path/to/code/
flake8 --extend-ignore E203,W234 path/to/code/
Please read our user guide for more information about how to use and configure
|Flake8|.

View file

@ -66,11 +66,3 @@ The standard library's :func:`fnmatch.fnmatch` is excellent at deciding if a
filename matches a single pattern. In our use case, however, we typically have
a list of patterns and want to know if the filename matches any of them. This
function abstracts that logic away with a little extra logic.
.. autofunction:: flake8.utils.parse_unified_diff
To handle usage of :option:`flake8 --diff`, |Flake8| needs to be able
to parse the name of the files in the diff as well as the ranges indicated the
sections that have been changed. This function either accepts the diff as an
argument or reads the diff from standard-in. It then returns a dictionary with
filenames as the keys and sets of line numbers as the value.

View file

@ -120,7 +120,7 @@ it would look like::
X10 = flake8_example:ExamplePlugin
In this casae as well as the following case, your entry-point name acts as
In this case as well as the following case, your entry-point name acts as
a prefix to the error codes produced by your plugin.
If all of your plugin's error codes start with ``X1`` then it would look
@ -143,6 +143,12 @@ i.e., ``ABC`` is better than ``A`` but ``ABCD`` is invalid.
*A 3 letters entry point prefix followed by 3 numbers (i.e.* ``ABC123`` *)
is currently the longest allowed entry point name.*
.. _off-by-default:
If your plugin is intended to be opt-in, it can set the attribute
``off_by_default = True``. Users of your plugin will then need to utilize
:ref:`enable-extensions<option-enable-extensions>` with your plugin's entry
point.
.. _Entry Points:
https://setuptools.readthedocs.io/en/latest/pkg_resources.html#entry-points

View file

@ -90,7 +90,7 @@ Let's actually look at |Flake8|'s own configuration section:
.. code-block:: ini
[flake8]
ignore = D203
extend-ignore = E203
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist
max-complexity = 10
@ -98,7 +98,7 @@ This is equivalent to:
.. prompt:: bash
flake8 --ignore D203 \
flake8 --extend-ignore E203 \
--exclude .git,__pycache__,docs/source/conf.py,old,build,dist \
--max-complexity 10
@ -107,7 +107,7 @@ In our case, if we wanted to, we could also do
.. code-block:: ini
[flake8]
ignore = D203
extend-ignore = E203
exclude =
.git,
__pycache__,
@ -122,7 +122,7 @@ This allows us to add comments for why we're excluding items, e.g.
.. code-block:: ini
[flake8]
ignore = D203
extend-ignore = E203
exclude =
# No need to traverse our git directory
.git,
@ -190,7 +190,7 @@ look at a portion of a project's Flake8 configuration in their ``tox.ini``:
# H404: multi line docstring should start without a leading new line
# H405: multi line docstring summary not separated with an empty line
# H501: Do not use self.__dict__ for string formatting
ignore = H101,H202,H233,H301,H306,H401,H403,H404,H405,H501
extend-ignore = H101,H202,H233,H301,H306,H401,H403,H404,H405,H501
They use the comments to describe the check but they could also write this as:
@ -198,7 +198,7 @@ They use the comments to describe the check but they could also write this as:
[flake8]
# it's not a bug that we aren't using all of hacking
ignore =
extend-ignore =
# H101: Use TODO(NAME)
H101,
# H202: assertRaises Exception too broad

View file

@ -86,69 +86,5 @@ And you should see something like:
Options:
--version show program's version number and exit
-h, --help show this help message and exit
-v, --verbose Print more information about what is happening in
flake8. This option is repeatable and will increase
verbosity each time it is repeated.
-q, --quiet Report only file names, or nothing. This option is
repeatable.
--count Print total number of errors and warnings to standard
error and set the exit code to 1 if total is not
empty.
--diff Report changes only within line number ranges in the
unified diff provided on standard in by the user.
--exclude=patterns Comma-separated list of files or directories to
exclude.(Default:
.svn,CVS,.bzr,.hg,.git,__pycache__,.tox,.nox,.eggs,
*.egg)
--filename=patterns Only check for filenames matching the patterns in this
comma-separated list. (Default: *.py)
--format=format Format errors according to the chosen formatter.
--hang-closing Hang closing bracket instead of matching indentation
of opening bracket's line.
--ignore=errors Comma-separated list of errors and warnings to ignore
(or skip). For example, ``--ignore=E4,E51,W234``.
(Default: E121,E123,E126,E226,E24,E704)
--extend-ignore=errors
Comma-separated list of errors and warnings to add to
the list of ignored ones. For example, ``--extend-
ignore=E4,E51,W234``.
--max-line-length=n Maximum allowed line length for the entirety of this
run. (Default: 79)
--select=errors Comma-separated list of errors and warnings to enable.
For example, ``--select=E4,E51,W234``. (Default: )
--extend-select errors
Comma-separated list of errors and warnings to add to
the list of selected ones. For example, ``--extend-
select=E4,E51,W234``.
--disable-noqa Disable the effect of "# noqa". This will report
errors on lines with "# noqa" at the end.
--show-source Show the source generate each error or warning.
--statistics Count errors and warnings.
--enabled-extensions=ENABLED_EXTENSIONS
Enable plugins and extensions that are otherwise
disabled by default
--exit-zero Exit with status code "0" even if there are errors.
-j JOBS, --jobs=JOBS Number of subprocesses to use to run checks in
parallel. This is ignored on Windows. The default,
"auto", will auto-detect the number of processors
available to use. (Default: auto)
--output-file=OUTPUT_FILE
Redirect report to a file.
--tee Write to stdout and output-file.
--append-config=APPEND_CONFIG
Provide extra config files to parse in addition to the
files found by Flake8 by default. These files are the
last ones read and so they take the highest precedence
when multiple files provide the same option.
--config=CONFIG Path to the config file that will be the authoritative
config source. This will cause Flake8 to ignore all
other configuration files.
--isolated Ignore all configuration files.
--builtins=BUILTINS define more built-ins, comma separated
--doctests check syntax of the doctests
--include-in-doctest=INCLUDE_IN_DOCTEST
Run doctests only on these files
--exclude-from-doctest=EXCLUDE_FROM_DOCTEST
Skip these files when running doctests
Installed plugins: pyflakes: 1.0.0, pep8: 1.7.0
...

View file

@ -44,8 +44,6 @@ Index of Options
- :option:`flake8 --count`
- :option:`flake8 --diff`
- :option:`flake8 --exclude`
- :option:`flake8 --filename`
@ -193,7 +191,7 @@ Options and their Descriptions
Possible options are ``auto``, ``always``, and ``never``.
This **can** be specified in config files.
This **can not** be specified in config files.
When color is enabled, the following substitutions are enabled:
@ -208,12 +206,6 @@ Options and their Descriptions
- ``%(white)s``
- ``%(reset)s``
Example config file usage:
.. code-block:: ini
color = never
.. option:: --count
@ -236,27 +228,6 @@ Options and their Descriptions
count = True
.. option:: --diff
:ref:`Go back to index <top>`
.. warning::
Due to hiding potential errors, this option is deprecated and will be
removed in a future version.
Use the unified diff provided on standard in to only check the modified
files and report errors included in the diff.
Command-line example:
.. prompt:: bash
git diff -u | flake8 --diff
This **can not** be specified in config files.
.. option:: --exclude=<patterns>
:ref:`Go back to index <top>`
@ -800,11 +771,13 @@ Options and their Descriptions
flake8-typing-extensions
.. _option-enable-extensions:
.. option:: --enable-extensions=<errors>
:ref:`Go back to index <top>`
Enable off-by-default extensions.
Enable :ref:`off-by-default<off-by-default>` extensions.
Plugins to |Flake8| have the option of registering themselves as
off-by-default. These plugins will not be loaded unless enabled by this

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import setuptools
setuptools.setup(

View file

@ -1,4 +1,6 @@
"""Module for an example Flake8 plugin."""
from __future__ import annotations
from .off_by_default import ExampleTwo
from .on_by_default import ExampleOne

View file

@ -1,4 +1,5 @@
"""Our first example plugin."""
from __future__ import annotations
class ExampleTwo:

View file

@ -1,4 +1,5 @@
"""Our first example plugin."""
from __future__ import annotations
class ExampleOne:

View file

@ -20,7 +20,6 @@ classifiers =
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
@ -44,7 +43,7 @@ install_requires =
pyflakes>=2.5.0,<2.6.0
importlib-metadata>=5;python_version=="3.7"
importlib-metadata>=1.1.0,<4.3;python_version<"3.7"
python_requires = >=3.6.1
python_requires = >=3.7
[options.packages.find]
where = src

View file

@ -1,4 +1,6 @@
"""Packaging logic for Flake8."""
from __future__ import annotations
import os
import sys

View file

@ -9,10 +9,10 @@ This module
.. autofunction:: flake8.configure_logging
"""
from __future__ import annotations
import logging
import sys
from typing import Optional
from typing import Type
LOG = logging.getLogger(__name__)
LOG.addHandler(logging.NullHandler())
@ -35,7 +35,7 @@ LOG_FORMAT = (
def configure_logging(
verbosity: int,
filename: Optional[str] = None,
filename: str | None = None,
logformat: str = LOG_FORMAT,
) -> None:
"""Configure logging for flake8.
@ -56,7 +56,7 @@ def configure_logging(
if not filename or filename in ("stderr", "stdout"):
fileobj = getattr(sys, filename or "stderr")
handler_cls: Type[logging.Handler] = logging.StreamHandler
handler_cls: type[logging.Handler] = logging.StreamHandler
else:
fileobj = filename
handler_cls = logging.FileHandler

View file

@ -1,4 +1,6 @@
"""Module allowing for ``python -m flake8 ...``."""
from __future__ import annotations
from flake8.main.cli import main
if __name__ == "__main__":

View file

@ -1,4 +1,6 @@
"""Expose backports in a single place."""
from __future__ import annotations
import sys
if sys.version_info >= (3, 8): # pragma: no cover (PY38+)

View file

@ -3,3 +3,4 @@
This is the only submodule in Flake8 with a guaranteed stable API. All other
submodules are considered internal only and are subject to change.
"""
from __future__ import annotations

View file

@ -3,19 +3,17 @@
Previously, users would import :func:`get_style_guide` from ``flake8.engine``.
In 3.0 we no longer have an "engine" module but we maintain the API from it.
"""
from __future__ import annotations
import argparse
import logging
import os.path
from typing import Any
from typing import List
from typing import Optional
from typing import Type
import flake8
from flake8.discover_files import expand_paths
from flake8.formatting import base as formatter
from flake8.main import application as app
from flake8.options import config
from flake8.options.parse_args import parse_args
LOG = logging.getLogger(__name__)
@ -53,7 +51,7 @@ class Report:
"""Return the total number of errors."""
return self._application.result_count
def get_statistics(self, violation: str) -> List[str]:
def get_statistics(self, violation: str) -> list[str]:
"""Get the list of occurrences of a violation.
:returns:
@ -97,12 +95,12 @@ class StyleGuide:
return self._application.options
@property
def paths(self) -> List[str]:
def paths(self) -> list[str]:
"""Return the extra arguments passed as paths."""
assert self._application.options is not None
return self._application.options.filenames
def check_files(self, paths: Optional[List[str]] = None) -> Report:
def check_files(self, paths: list[str] | None = None) -> Report:
"""Run collected checks on the files provided.
This will check the files passed in and return a :class:`Report`
@ -119,7 +117,7 @@ class StyleGuide:
self._application.report_errors()
return Report(self._application)
def excluded(self, filename: str, parent: Optional[str] = None) -> bool:
def excluded(self, filename: str, parent: str | None = None) -> bool:
"""Determine if a file is excluded.
:param filename:
@ -137,7 +135,6 @@ class StyleGuide:
stdin_display_name=self.options.stdin_display_name,
filename_patterns=self.options.filename,
exclude=self.options.exclude,
is_running_from_diff=self.options.diff,
)
)
return not paths
@ -148,7 +145,7 @@ class StyleGuide:
def init_report(
self,
reporter: Optional[Type[formatter.BaseFormatter]] = None,
reporter: type[formatter.BaseFormatter] | None = None,
) -> None:
"""Set up a formatter for this run of Flake8."""
if reporter is None:
@ -165,14 +162,14 @@ class StyleGuide:
# Stop cringing... I know it's gross.
self._application.make_guide()
self._application.file_checker_manager = None
self._application.make_file_checker_manager()
self._application.make_file_checker_manager([])
def input_file(
self,
filename: str,
lines: Optional[Any] = None,
expected: Optional[Any] = None,
line_offset: Optional[Any] = 0,
lines: Any | None = None,
expected: Any | None = None,
line_offset: Any | None = 0,
) -> Report:
"""Run collected checks on a single file.
@ -202,23 +199,7 @@ def get_style_guide(**kwargs: Any) -> StyleGuide:
An initialized StyleGuide
"""
application = app.Application()
prelim_opts, remaining_args = application.parse_preliminary_options([])
flake8.configure_logging(prelim_opts.verbose, prelim_opts.output_file)
cfg, cfg_dir = config.load_config(
config=prelim_opts.config,
extra=prelim_opts.append_config,
isolated=prelim_opts.isolated,
)
application.find_plugins(
cfg,
cfg_dir,
enable_extensions=prelim_opts.enable_extensions,
require_plugins=prelim_opts.require_plugins,
)
application.register_plugin_options()
application.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
application.plugins, application.options = parse_args([])
# We basically want application.initialize to be called but with these
# options set instead before we make our formatter, notifier, internal
# style guide and file checker manager.
@ -231,5 +212,5 @@ def get_style_guide(**kwargs: Any) -> StyleGuide:
LOG.error('Could not update option "%s"', key)
application.make_formatter()
application.make_guide()
application.make_file_checker_manager()
application.make_file_checker_manager([])
return StyleGuide(application)

View file

@ -1,16 +1,19 @@
"""Checker Manager and Checker classes."""
from __future__ import annotations
import argparse
import collections
import contextlib
import errno
import itertools
import logging
import multiprocessing.pool
import operator
import signal
import tokenize
from typing import Any
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from flake8 import defaults
@ -18,6 +21,7 @@ from flake8 import exceptions
from flake8 import processor
from flake8 import utils
from flake8.discover_files import expand_paths
from flake8.options.parse_args import parse_args
from flake8.plugins.finder import Checkers
from flake8.plugins.finder import LoadedPlugin
from flake8.style_guide import StyleGuideManager
@ -41,6 +45,41 @@ SERIAL_RETRY_ERRNOS = {
# noise in diffs.
}
_mp_plugins: Checkers
_mp_options: argparse.Namespace
@contextlib.contextmanager
def _mp_prefork(
plugins: Checkers, options: argparse.Namespace
) -> Generator[None, None, None]:
# we can save significant startup work w/ `fork` multiprocessing
global _mp_plugins, _mp_options
_mp_plugins, _mp_options = plugins, options
try:
yield
finally:
del _mp_plugins, _mp_options
def _mp_init(argv: Sequence[str]) -> None:
global _mp_plugins, _mp_options
# Ensure correct signaling of ^C using multiprocessing.Pool.
signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
_mp_plugins, _mp_options # for `fork` this'll already be set
except NameError:
plugins, options = parse_args(argv)
_mp_plugins, _mp_options = plugins.checkers, options
def _mp_run(filename: str) -> tuple[str, Results, dict[str, int]]:
return FileChecker(
filename=filename, plugins=_mp_plugins, options=_mp_options
).run_checks()
class Manager:
"""Manage the parallelism and checker instances for each plugin and file.
@ -65,49 +104,36 @@ class Manager:
self,
style_guide: StyleGuideManager,
plugins: Checkers,
argv: Sequence[str],
) -> None:
"""Initialize our Manager instance."""
self.style_guide = style_guide
self.options = style_guide.options
self.plugins = plugins
self.jobs = self._job_count()
self._all_checkers: List[FileChecker] = []
self.checkers: List[FileChecker] = []
self.statistics = {
"files": 0,
"logical lines": 0,
"physical lines": 0,
"tokens": 0,
}
self.exclude = tuple(
itertools.chain(self.options.exclude, self.options.extend_exclude)
)
self.exclude = (*self.options.exclude, *self.options.extend_exclude)
self.argv = argv
self.results: list[tuple[str, Results, dict[str, int]]] = []
def _process_statistics(self) -> None:
for checker in self.checkers:
for _, _, statistics in self.results:
for statistic in defaults.STATISTIC_NAMES:
self.statistics[statistic] += checker.statistics[statistic]
self.statistics["files"] += len(self.checkers)
self.statistics[statistic] += statistics[statistic]
self.statistics["files"] += len(self.filenames)
def _job_count(self) -> int:
# First we walk through all of our error cases:
# - multiprocessing library is not present
# - we're running on windows in which case we know we have significant
# implementation issues
# - the user provided stdin and that's not something we can handle
# well
# - we're processing a diff, which again does not work well with
# multiprocessing and which really shouldn't require multiprocessing
# - the user provided some awful input
# class state is only preserved when using the `fork` strategy.
if multiprocessing.get_start_method() != "fork":
LOG.warning(
"The multiprocessing module is not available. "
"Ignoring --jobs arguments."
)
return 0
if utils.is_using_stdin(self.options.filenames):
LOG.warning(
"The --jobs option is not compatible with supplying "
@ -115,13 +141,6 @@ class Manager:
)
return 0
if self.options.diff:
LOG.warning(
"The --diff option was specified with --jobs but "
"they are not compatible. Ignoring --jobs arguments."
)
return 0
jobs = self.options.jobs
# If the value is "auto", we want to let the multiprocessing library
@ -152,29 +171,7 @@ class Manager:
)
return reported_results_count
def make_checkers(self, paths: Optional[List[str]] = None) -> None:
"""Create checkers for each file."""
if paths is None:
paths = self.options.filenames
self._all_checkers = [
FileChecker(
filename=filename,
plugins=self.plugins,
options=self.options,
)
for filename in expand_paths(
paths=paths,
stdin_display_name=self.options.stdin_display_name,
filename_patterns=self.options.filename,
exclude=self.exclude,
is_running_from_diff=self.options.diff,
)
]
self.checkers = [c for c in self._all_checkers if c.should_process]
LOG.info("Checking %d files", len(self.checkers))
def report(self) -> Tuple[int, int]:
def report(self) -> tuple[int, int]:
"""Report all of the errors found in the managed file checkers.
This iterates over each of the checkers and reports the errors sorted
@ -184,9 +181,9 @@ class Manager:
A tuple of the total results found and the results reported.
"""
results_reported = results_found = 0
for checker in self._all_checkers:
results = sorted(checker.results, key=lambda tup: (tup[1], tup[2]))
filename = checker.display_name
self.results.sort(key=operator.itemgetter(0))
for filename, results, _ in self.results:
results.sort(key=operator.itemgetter(1, 2))
with self.style_guide.processing_file(filename):
results_reported += self._handle_results(filename, results)
results_found += len(results)
@ -194,12 +191,8 @@ class Manager:
def run_parallel(self) -> None:
"""Run the checkers in parallel."""
# fmt: off
final_results: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] = collections.defaultdict(list) # noqa: E501
final_statistics: Dict[str, Dict[str, int]] = collections.defaultdict(dict) # noqa: E501
# fmt: on
pool = _try_initialize_processpool(self.jobs)
with _mp_prefork(self.plugins, self.options):
pool = _try_initialize_processpool(self.jobs, self.argv)
if pool is None:
self.run_serial()
@ -207,17 +200,7 @@ class Manager:
pool_closed = False
try:
pool_map = pool.imap_unordered(
_run_checks,
self.checkers,
chunksize=calculate_pool_chunksize(
len(self.checkers), self.jobs
),
)
for ret in pool_map:
filename, results, statistics = ret
final_results[filename] = results
final_statistics[filename] = statistics
self.results = list(pool.imap_unordered(_mp_run, self.filenames))
pool.close()
pool.join()
pool_closed = True
@ -226,15 +209,16 @@ class Manager:
pool.terminate()
pool.join()
for checker in self.checkers:
filename = checker.display_name
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]
def run_serial(self) -> None:
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
self.results = [
FileChecker(
filename=filename,
plugins=self.plugins,
options=self.options,
).run_checks()
for filename in self.filenames
]
def run(self) -> None:
"""Run all the checkers.
@ -246,7 +230,7 @@ class Manager:
:issue:`117`) this also implements fallback to serial processing.
"""
try:
if self.jobs > 1 and len(self.checkers) > 1:
if self.jobs > 1 and len(self.filenames) > 1:
self.run_parallel()
else:
self.run_serial()
@ -254,7 +238,7 @@ class Manager:
LOG.warning("Flake8 was interrupted by the user")
raise exceptions.EarlyQuit("Early quit while running checks")
def start(self, paths: Optional[List[str]] = None) -> None:
def start(self) -> None:
"""Start checking files.
:param paths:
@ -262,7 +246,14 @@ class Manager:
:meth:`~Manager.make_checkers`.
"""
LOG.info("Making checkers")
self.make_checkers(paths)
self.filenames = tuple(
expand_paths(
paths=self.options.filenames,
stdin_display_name=self.options.stdin_display_name,
filename_patterns=self.options.filename,
exclude=self.exclude,
)
)
def stop(self) -> None:
"""Stop checking files."""
@ -301,7 +292,7 @@ class FileChecker:
"""Provide helpful debugging representation."""
return f"FileChecker for {self.filename}"
def _make_processor(self) -> Optional[processor.FileProcessor]:
def _make_processor(self) -> processor.FileProcessor | None:
try:
return processor.FileProcessor(self.filename, self.options)
except OSError as e:
@ -316,7 +307,7 @@ class FileChecker:
def report(
self,
error_code: Optional[str],
error_code: str | None,
line_number: int,
column: int,
text: str,
@ -337,7 +328,7 @@ class FileChecker:
def run_check(self, plugin: LoadedPlugin, **arguments: Any) -> Any:
"""Run the check in a single plugin."""
assert self.processor is not None
assert self.processor is not None, self.filename
try:
params = self.processor.keyword_arguments_for(
plugin.parameters, arguments
@ -361,7 +352,7 @@ class FileChecker:
)
@staticmethod
def _extract_syntax_information(exception: Exception) -> Tuple[int, int]:
def _extract_syntax_information(exception: Exception) -> tuple[int, int]:
if (
len(exception.args) > 1
and exception.args[1]
@ -421,7 +412,7 @@ class FileChecker:
def run_ast_checks(self) -> None:
"""Run all checks expecting an abstract syntax tree."""
assert self.processor is not None
assert self.processor is not None, self.filename
ast = self.processor.build_ast()
for plugin in self.plugins.tree:
@ -524,9 +515,11 @@ class FileChecker:
self.run_physical_checks(file_processor.lines[-1])
self.run_logical_checks()
def run_checks(self) -> Tuple[str, Results, Dict[str, int]]:
def run_checks(self) -> tuple[str, Results, dict[str, int]]:
"""Run checks against the file."""
assert self.processor is not None
if self.processor is None or not self.should_process:
return self.display_name, self.results, self.statistics
try:
self.run_ast_checks()
self.process_tokens()
@ -534,11 +527,11 @@ class FileChecker:
code = "E902" if isinstance(e, tokenize.TokenError) else "E999"
row, column = self._extract_syntax_information(e)
self.report(code, row, column, f"{type(e).__name__}: {e.args[0]}")
return self.filename, self.results, self.statistics
return self.display_name, self.results, self.statistics
logical_lines = self.processor.statistics["logical lines"]
self.statistics["logical lines"] = logical_lines
return self.filename, self.results, self.statistics
return self.display_name, self.results, self.statistics
def handle_newline(self, token_type: int) -> None:
"""Handle the logic when encountering a newline token."""
@ -585,17 +578,13 @@ class FileChecker:
self.run_physical_checks(line)
def _pool_init() -> None:
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _try_initialize_processpool(
job_count: int,
) -> Optional[multiprocessing.pool.Pool]:
argv: Sequence[str],
) -> multiprocessing.pool.Pool | None:
"""Return a new process pool instance if we are able to create one."""
try:
return multiprocessing.Pool(job_count, _pool_init)
return multiprocessing.Pool(job_count, _mp_init, initargs=(argv,))
except OSError as err:
if err.errno not in SERIAL_RETRY_ERRNOS:
raise
@ -605,25 +594,9 @@ def _try_initialize_processpool(
return None
def calculate_pool_chunksize(num_checkers: int, num_jobs: int) -> int:
"""Determine the chunksize for the multiprocessing Pool.
- For chunksize, see: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap # noqa
- This formula, while not perfect, aims to give each worker two batches of
work.
- See: https://github.com/pycqa/flake8/issues/829#note_18878876
- See: https://github.com/pycqa/flake8/issues/197
"""
return max(num_checkers // (num_jobs * 2), 1)
def _run_checks(checker: FileChecker) -> Tuple[str, Results, Dict[str, int]]:
return checker.run_checks()
def find_offset(
offset: int, mapping: processor._LogicalMapping
) -> Tuple[int, int]:
) -> tuple[int, int]:
"""Find the offset tuple for a single offset."""
if isinstance(offset, tuple):
return offset

View file

@ -1,4 +1,6 @@
"""Constants that define defaults."""
from __future__ import annotations
import re
EXCLUDE = (
@ -41,3 +43,5 @@ NOQA_INLINE_REGEXP = re.compile(
)
NOQA_FILE = re.compile(r"\s*# flake8[:=]\s*noqa", re.I)
VALID_CODE_PREFIX = re.compile("^[A-Z]{1,3}[0-9]{0,3}$", re.ASCII)

View file

@ -1,4 +1,6 @@
"""Functions related to discovering paths."""
from __future__ import annotations
import logging
import os.path
from typing import Callable
@ -53,7 +55,6 @@ def expand_paths(
stdin_display_name: str,
filename_patterns: Sequence[str],
exclude: Sequence[str],
is_running_from_diff: bool,
) -> Generator[str, None, None]:
"""Expand out ``paths`` from commandline to the lintable files."""
if not paths:
@ -73,24 +74,16 @@ def expand_paths(
logger=LOG,
)
def is_included(arg: str, fname: str) -> bool:
# while running from a diff, the arguments aren't _explicitly_
# listed so we still filter them
if is_running_from_diff:
return utils.fnmatch(fname, filename_patterns)
else:
return (
# always lint `-`
fname == "-"
# always lint explicitly passed (even if not matching filter)
or arg == fname
# otherwise, check the file against filtered patterns
or utils.fnmatch(fname, filename_patterns)
)
return (
filename
for path in paths
for filename in _filenames_from(path, predicate=is_excluded)
if is_included(path, filename)
if (
# always lint `-`
filename == "-"
# always lint explicitly passed (even if not matching filter)
or path == filename
# otherwise, check the file against filtered patterns
or utils.fnmatch(filename, filename_patterns)
)
)

View file

@ -1,4 +1,5 @@
"""Exception classes for all of Flake8."""
from __future__ import annotations
class Flake8Exception(Exception):

View file

@ -1 +1,2 @@
"""Submodule containing the default formatters for Flake8."""
from __future__ import annotations

View file

@ -2,6 +2,8 @@
See: https://github.com/pre-commit/pre-commit/blob/cb40e96/pre_commit/color.py
"""
from __future__ import annotations
import sys
if sys.platform == "win32": # pragma: no cover (windows)

View file

@ -1,11 +1,10 @@
"""The base class and interface for all formatting plugins."""
from __future__ import annotations
import argparse
import os
import sys
from typing import IO
from typing import List
from typing import Optional
from typing import Tuple
from flake8.formatting import _windows_color
from flake8.statistics import Statistics
@ -46,7 +45,7 @@ class BaseFormatter:
"""
self.options = options
self.filename = options.output_file
self.output_fd: Optional[IO[str]] = None
self.output_fd: IO[str] | None = None
self.newline = "\n"
self.color = options.color == "always" or (
options.color == "auto"
@ -84,7 +83,7 @@ class BaseFormatter:
os.makedirs(dirname, exist_ok=True)
self.output_fd = open(self.filename, "a")
def handle(self, error: "Violation") -> None:
def handle(self, error: Violation) -> None:
"""Handle an error reported by Flake8.
This defaults to calling :meth:`format`, :meth:`show_source`, and
@ -99,7 +98,7 @@ class BaseFormatter:
source = self.show_source(error)
self.write(line, source)
def format(self, error: "Violation") -> Optional[str]:
def format(self, error: Violation) -> str | None:
"""Format an error reported by Flake8.
This method **must** be implemented by subclasses.
@ -114,7 +113,7 @@ class BaseFormatter:
"Subclass of BaseFormatter did not implement" " format."
)
def show_statistics(self, statistics: "Statistics") -> None:
def show_statistics(self, statistics: Statistics) -> None:
"""Format and print the statistics."""
for error_code in statistics.error_codes():
stats_for_error_code = statistics.statistics_for(error_code)
@ -123,7 +122,7 @@ class BaseFormatter:
count += sum(stat.count for stat in stats_for_error_code)
self._write(f"{count:<5} {error_code} {statistic.message}")
def show_benchmarks(self, benchmarks: List[Tuple[str, float]]) -> None:
def show_benchmarks(self, benchmarks: list[tuple[str, float]]) -> None:
"""Format and print the benchmarks."""
# NOTE(sigmavirus24): The format strings are a little confusing, even
# to me, so here's a quick explanation:
@ -144,7 +143,7 @@ class BaseFormatter:
benchmark = float_format(statistic=statistic, value=value)
self._write(benchmark)
def show_source(self, error: "Violation") -> Optional[str]:
def show_source(self, error: Violation) -> str | None:
"""Show the physical line generating the error.
This also adds an indicator for the particular part of the line that
@ -178,7 +177,7 @@ class BaseFormatter:
if self.output_fd is None or self.options.tee:
sys.stdout.buffer.write(output.encode() + self.newline.encode())
def write(self, line: Optional[str], source: Optional[str]) -> None:
def write(self, line: str | None, source: str | None) -> None:
"""Write the line either to the output file or stdout.
This handles deciding whether to write to a file or print to standard

View file

@ -1,6 +1,5 @@
"""Default formatting class for Flake8."""
from typing import Optional
from typing import Set
from __future__ import annotations
from flake8.formatting import base
from flake8.violation import Violation
@ -38,7 +37,7 @@ class SimpleFormatter(base.BaseFormatter):
error_format: str
def format(self, error: "Violation") -> Optional[str]:
def format(self, error: Violation) -> str | None:
"""Format and write error out.
If an output filename is specified, write formatted errors to that
@ -86,12 +85,12 @@ class FilenameOnly(SimpleFormatter):
def after_init(self) -> None:
"""Initialize our set of filenames."""
self.filenames_already_printed: Set[str] = set()
self.filenames_already_printed: set[str] = set()
def show_source(self, error: "Violation") -> Optional[str]:
def show_source(self, error: Violation) -> str | None:
"""Do not include the source code."""
def format(self, error: "Violation") -> Optional[str]:
def format(self, error: Violation) -> str | None:
"""Ensure we only print each error once."""
if error.filename not in self.filenames_already_printed:
self.filenames_already_printed.add(error.filename)
@ -103,8 +102,8 @@ class FilenameOnly(SimpleFormatter):
class Nothing(base.BaseFormatter):
"""Print absolutely nothing."""
def format(self, error: "Violation") -> Optional[str]:
def format(self, error: Violation) -> str | None:
"""Do nothing."""
def show_source(self, error: "Violation") -> Optional[str]:
def show_source(self, error: Violation) -> str | None:
"""Do not print the source."""

View file

@ -1 +1,2 @@
"""Module containing the logic for the Flake8 entry-points."""
from __future__ import annotations

View file

@ -1,28 +1,20 @@
"""Module containing the application logic for Flake8."""
from __future__ import annotations
import argparse
import configparser
import json
import logging
import time
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
import flake8
from flake8 import checker
from flake8 import defaults
from flake8 import exceptions
from flake8 import style_guide
from flake8 import utils
from flake8.formatting.base import BaseFormatter
from flake8.main import debug
from flake8.main import options
from flake8.options import aggregator
from flake8.options import config
from flake8.options import manager
from flake8.options.parse_args import parse_args
from flake8.plugins import finder
from flake8.plugins import reporter
@ -38,27 +30,21 @@ class Application:
#: The timestamp when the Application instance was instantiated.
self.start_time = time.time()
#: The timestamp when the Application finished reported errors.
self.end_time: Optional[float] = None
#: The prelimary argument parser for handling options required for
#: obtaining and parsing the configuration file.
self.prelim_arg_parser = options.stage1_arg_parser()
#: The instance of :class:`flake8.options.manager.OptionManager` used
#: to parse and handle the options and arguments passed by the user
self.option_manager: Optional[manager.OptionManager] = None
self.end_time: float | None = None
self.plugins: Optional[finder.Plugins] = None
self.plugins: finder.Plugins | None = None
#: The user-selected formatter from :attr:`formatting_plugins`
self.formatter: Optional[BaseFormatter] = None
self.formatter: BaseFormatter | None = None
#: The :class:`flake8.style_guide.StyleGuideManager` built from the
#: user's options
self.guide: Optional[style_guide.StyleGuideManager] = None
self.guide: style_guide.StyleGuideManager | None = None
#: The :class:`flake8.checker.Manager` that will handle running all of
#: the checks selected by the user.
self.file_checker_manager: Optional[checker.Manager] = None
self.file_checker_manager: checker.Manager | None = None
#: The user-supplied options parsed into an instance of
#: :class:`argparse.Namespace`
self.options: Optional[argparse.Namespace] = None
self.options: argparse.Namespace | None = None
#: The number of errors, warnings, and other messages after running
#: flake8 and taking into account ignored errors and lines.
self.result_count = 0
@ -69,33 +55,6 @@ class Application:
#: with a non-zero status code
self.catastrophic_failure = False
#: The parsed diff information
self.parsed_diff: Dict[str, Set[int]] = {}
def parse_preliminary_options(
self, argv: Sequence[str]
) -> Tuple[argparse.Namespace, List[str]]:
"""Get preliminary options from the CLI, pre-plugin-loading.
We need to know the values of a few standard options so that we can
locate configuration files and configure logging.
Since plugins aren't loaded yet, there may be some as-yet-unknown
options; we ignore those for now, they'll be parsed later when we do
real option parsing.
:param argv:
Command-line arguments passed in directly.
:returns:
Populated namespace and list of remaining argument strings.
"""
args, rest = self.prelim_arg_parser.parse_known_args(argv)
# XXX (ericvw): Special case "forwarding" the output file option so
# that it can be reparsed again for the BaseFormatter.filename.
if args.output_file:
rest.extend(("--output-file", args.output_file))
return args, rest
def exit_code(self) -> int:
"""Return the program exit code."""
if self.catastrophic_failure:
@ -106,82 +65,6 @@ class Application:
else:
return int(self.result_count > 0)
def find_plugins(
self,
cfg: configparser.RawConfigParser,
cfg_dir: str,
*,
enable_extensions: Optional[str],
require_plugins: Optional[str],
) -> None:
"""Find and load the plugins for this application.
Set :attr:`plugins` based on loaded plugins.
"""
opts = finder.parse_plugin_options(
cfg,
cfg_dir,
enable_extensions=enable_extensions,
require_plugins=require_plugins,
)
raw = finder.find_plugins(cfg, opts)
self.plugins = finder.load_plugins(raw, opts)
def register_plugin_options(self) -> None:
"""Register options provided by plugins to our option manager."""
assert self.plugins is not None
self.option_manager = manager.OptionManager(
version=flake8.__version__,
plugin_versions=self.plugins.versions_str(),
parents=[self.prelim_arg_parser],
)
options.register_default_options(self.option_manager)
self.option_manager.register_plugins(self.plugins)
def parse_configuration_and_cli(
self,
cfg: configparser.RawConfigParser,
cfg_dir: str,
argv: List[str],
) -> None:
"""Parse configuration files and the CLI options."""
assert self.option_manager is not None
assert self.plugins is not None
self.options = aggregator.aggregate_options(
self.option_manager,
cfg,
cfg_dir,
argv,
)
if self.options.bug_report:
info = debug.information(flake8.__version__, self.plugins)
print(json.dumps(info, indent=2, sort_keys=True))
raise SystemExit(0)
if self.options.diff:
LOG.warning(
"the --diff option is deprecated and will be removed in a "
"future version."
)
self.parsed_diff = utils.parse_unified_diff()
for loaded in self.plugins.all_plugins():
parse_options = getattr(loaded.obj, "parse_options", None)
if parse_options is None:
continue
# XXX: ideally we wouldn't have two forms of parse_options
try:
parse_options(
self.option_manager,
self.options,
self.options.filenames,
)
except TypeError:
parse_options(self.options)
def make_formatter(self) -> None:
"""Initialize a formatter based on the parsed options."""
assert self.plugins is not None
@ -196,16 +79,14 @@ class Application:
self.options, self.formatter
)
if self.options.diff:
self.guide.add_diff_ranges(self.parsed_diff)
def make_file_checker_manager(self) -> None:
def make_file_checker_manager(self, argv: Sequence[str]) -> None:
"""Initialize our FileChecker Manager."""
assert self.guide is not None
assert self.plugins is not None
self.file_checker_manager = checker.Manager(
style_guide=self.guide,
plugins=self.plugins.checkers,
argv=argv,
)
def run_checks(self) -> None:
@ -215,16 +96,9 @@ class Application:
:class:`~flake8.checker.Manger` instance run the checks it is
managing.
"""
assert self.options is not None
assert self.file_checker_manager is not None
if self.options.diff:
files: Optional[List[str]] = sorted(self.parsed_diff)
if not files:
return
else:
files = None
self.file_checker_manager.start(files)
self.file_checker_manager.start()
try:
self.file_checker_manager.run()
except exceptions.PluginExecutionFailed as plugin_failed:
@ -288,28 +162,16 @@ class Application:
This finds the plugins, registers their options, and parses the
command-line arguments.
"""
# NOTE(sigmavirus24): When updating this, make sure you also update
# our legacy API calls to these same methods.
prelim_opts, remaining_args = self.parse_preliminary_options(argv)
flake8.configure_logging(prelim_opts.verbose, prelim_opts.output_file)
self.plugins, self.options = parse_args(argv)
cfg, cfg_dir = config.load_config(
config=prelim_opts.config,
extra=prelim_opts.append_config,
isolated=prelim_opts.isolated,
)
if self.options.bug_report:
info = debug.information(flake8.__version__, self.plugins)
print(json.dumps(info, indent=2, sort_keys=True))
raise SystemExit(0)
self.find_plugins(
cfg,
cfg_dir,
enable_extensions=prelim_opts.enable_extensions,
require_plugins=prelim_opts.require_plugins,
)
self.register_plugin_options()
self.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
self.make_formatter()
self.make_guide()
self.make_file_checker_manager()
self.make_file_checker_manager(argv)
def report(self) -> None:
"""Report errors, statistics, and benchmarks."""

View file

@ -1,12 +1,13 @@
"""Command-line implementation of flake8."""
from __future__ import annotations
import sys
from typing import Optional
from typing import Sequence
from flake8.main import application
def main(argv: Optional[Sequence[str]] = None) -> int:
def main(argv: Sequence[str] | None = None) -> int:
"""Execute the main bit of the application.
This handles the creation of an instance of :class:`Application`, runs it,

View file

@ -1,12 +1,13 @@
"""Module containing the logic for our debugging logic."""
from __future__ import annotations
import platform
from typing import Any
from typing import Dict
from flake8.plugins.finder import Plugins
def information(version: str, plugins: Plugins) -> Dict[str, Any]:
def information(version: str, plugins: Plugins) -> dict[str, Any]:
"""Generate the information to be printed for the bug report."""
versions = sorted(
{

View file

@ -1,4 +1,6 @@
"""Contains the logic for all of the default options for Flake8."""
from __future__ import annotations
import argparse
from flake8 import defaults
@ -112,7 +114,6 @@ def register_default_options(option_manager: OptionManager) -> None:
- ``-q``/``--quiet``
- ``--color``
- ``--count``
- ``--diff``
- ``--exclude``
- ``--extend-exclude``
- ``--filename``
@ -157,15 +158,8 @@ def register_default_options(option_manager: OptionManager) -> None:
"--count",
action="store_true",
parse_from_config=True,
help="Print total number of errors to standard output and "
"set the exit code to 1 if total is not empty.",
)
add_option(
"--diff",
action="store_true",
help="(DEPRECATED) Report changes only within line number ranges in "
"the unified diff provided on standard in by the user.",
help="Print total number of errors to standard output after "
"all other output.",
)
add_option(
@ -218,7 +212,15 @@ def register_default_options(option_manager: OptionManager) -> None:
metavar="format",
default="default",
parse_from_config=True,
help="Format errors according to the chosen formatter.",
help=(
f"Format errors according to the chosen formatter "
f"({', '.join(sorted(option_manager.formatter_names))}) "
f"or a format string containing %%-style "
f"mapping keys (code, col, path, row, text). "
f"For example, "
f"``--format=pylint`` or ``--format='%%(path)s %%(code)s'``. "
f"(Default: %(default)s)"
),
)
add_option(

View file

@ -10,3 +10,4 @@
to aggregate configuration into one object used by plugins and Flake8.
"""
from __future__ import annotations

View file

@ -3,10 +3,11 @@
This holds the logic that uses the collected and merged config files and
applies the user-specified command-line configuration on top of it.
"""
from __future__ import annotations
import argparse
import configparser
import logging
from typing import Optional
from typing import Sequence
from flake8.options import config
@ -19,7 +20,7 @@ def aggregate_options(
manager: OptionManager,
cfg: configparser.RawConfigParser,
cfg_dir: str,
argv: Optional[Sequence[str]],
argv: Sequence[str] | None,
) -> argparse.Namespace:
"""Aggregate and merge CLI and config file options."""
# Get defaults from the option parser

View file

@ -1,26 +1,25 @@
"""Config handling logic for Flake8."""
from __future__ import annotations
import configparser
import logging
import os.path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from flake8 import exceptions
from flake8.defaults import VALID_CODE_PREFIX
from flake8.options.manager import OptionManager
LOG = logging.getLogger(__name__)
def _stat_key(s: str) -> Tuple[int, int]:
def _stat_key(s: str) -> tuple[int, int]:
# same as what's used by samefile / samestat
st = os.stat(s)
return st.st_ino, st.st_dev
def _find_config_file(path: str) -> Optional[str]:
def _find_config_file(path: str) -> str | None:
# on windows if the homedir isn't detected this returns back `~`
home = os.path.expanduser("~")
try:
@ -55,11 +54,11 @@ def _find_config_file(path: str) -> Optional[str]:
def load_config(
config: Optional[str],
extra: List[str],
config: str | None,
extra: list[str],
*,
isolated: bool = False,
) -> Tuple[configparser.RawConfigParser, str]:
) -> tuple[configparser.RawConfigParser, str]:
"""Load the configuration given the user options.
- in ``isolated`` mode, return an empty configuration
@ -88,7 +87,10 @@ def load_config(
# TODO: remove this and replace it with configuration modifying plugins
# read the additional configs afterwards
for filename in extra:
cfg.read(filename, encoding="UTF-8")
if not cfg.read(filename, encoding="UTF-8"):
raise exceptions.ExecutionError(
f"The specified config file does not exist: {filename}"
)
return cfg, cfg_dir
@ -97,7 +99,7 @@ def parse_config(
option_manager: OptionManager,
cfg: configparser.RawConfigParser,
cfg_dir: str,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Parse and normalize the typed configuration options."""
if "flake8" not in cfg:
return {}
@ -122,6 +124,16 @@ def parse_config(
LOG.debug('Option "%s" returned value: %r', option_name, value)
final_value = option.normalize(value, cfg_dir)
if option_name in {"ignore", "extend-ignore"}:
for error_code in final_value:
if not VALID_CODE_PREFIX.match(error_code):
raise ValueError(
f"Error code {error_code!r} "
f"supplied to {option_name!r} option "
f"does not match {VALID_CODE_PREFIX.pattern!r}"
)
assert option.config_name is not None
config_dict[option.config_name] = final_value

View file

@ -1,18 +1,13 @@
"""Option handling and Option management logic."""
from __future__ import annotations
import argparse
import enum
import functools
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Type
from typing import Union
from flake8 import utils
from flake8.plugins.finder import Plugins
@ -24,62 +19,13 @@ LOG = logging.getLogger(__name__)
_ARG = enum.Enum("_ARG", "NO")
_optparse_callable_map: Dict[str, Union[Type[Any], _ARG]] = {
"int": int,
"long": int,
"string": str,
"float": float,
"complex": complex,
"choice": _ARG.NO,
# optparse allows this but does not document it
"str": str,
}
class _CallbackAction(argparse.Action):
"""Shim for optparse-style callback actions."""
def __init__(
self,
*args: Any,
callback: Callable[..., Any],
callback_args: Sequence[Any] = (),
callback_kwargs: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
self._callback = callback
self._callback_args = callback_args
self._callback_kwargs = callback_kwargs or {}
super().__init__(*args, **kwargs)
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Optional[Union[Sequence[str], str]],
option_string: Optional[str] = None,
) -> None:
if not values:
values = None
elif isinstance(values, list) and len(values) > 1:
values = tuple(values)
self._callback(
self,
option_string,
values,
parser,
*self._callback_args,
**self._callback_kwargs,
)
def _flake8_normalize(
value: str,
*args: str,
comma_separated_list: bool = False,
normalize_paths: bool = False,
) -> Union[str, List[str]]:
ret: Union[str, List[str]] = value
) -> str | list[str]:
ret: str | list[str] = value
if comma_separated_list and isinstance(ret, str):
ret = utils.parse_comma_separated_list(value)
@ -97,24 +43,19 @@ class Option:
def __init__(
self,
short_option_name: Union[str, _ARG] = _ARG.NO,
long_option_name: Union[str, _ARG] = _ARG.NO,
# Options below here are taken from the optparse.Option class
action: Union[str, Type[argparse.Action], _ARG] = _ARG.NO,
default: Union[Any, _ARG] = _ARG.NO,
type: Union[str, Callable[..., Any], _ARG] = _ARG.NO,
dest: Union[str, _ARG] = _ARG.NO,
nargs: Union[int, str, _ARG] = _ARG.NO,
const: Union[Any, _ARG] = _ARG.NO,
choices: Union[Sequence[Any], _ARG] = _ARG.NO,
help: Union[str, _ARG] = _ARG.NO,
metavar: Union[str, _ARG] = _ARG.NO,
# deprecated optparse-only options
callback: Union[Callable[..., Any], _ARG] = _ARG.NO,
callback_args: Union[Sequence[Any], _ARG] = _ARG.NO,
callback_kwargs: Union[Mapping[str, Any], _ARG] = _ARG.NO,
short_option_name: str | _ARG = _ARG.NO,
long_option_name: str | _ARG = _ARG.NO,
# Options below are taken from argparse.ArgumentParser.add_argument
required: Union[bool, _ARG] = _ARG.NO,
action: str | type[argparse.Action] | _ARG = _ARG.NO,
default: Any | _ARG = _ARG.NO,
type: Callable[..., Any] | _ARG = _ARG.NO,
dest: str | _ARG = _ARG.NO,
nargs: int | str | _ARG = _ARG.NO,
const: Any | _ARG = _ARG.NO,
choices: Sequence[Any] | _ARG = _ARG.NO,
help: str | _ARG = _ARG.NO,
metavar: str | _ARG = _ARG.NO,
required: bool | _ARG = _ARG.NO,
# Options below here are specific to Flake8
parse_from_config: bool = False,
comma_separated_list: bool = False,
@ -154,21 +95,9 @@ class Option:
:param type:
A callable to normalize the type (as is the case in
:mod:`argparse`). Deprecated: you can also pass through type
strings such as ``'int'`` which are handled by :mod:`optparse`.
:mod:`argparse`).
:param action:
Any action allowed by :mod:`argparse`. Deprecated: this also
understands the ``action='callback'`` action from :mod:`optparse`.
:param callback:
Callback used if the action is ``"callback"``. Deprecated: please
use ``action=`` instead.
:param callback_args:
Additional positional arguments to the callback callable.
Deprecated: please use ``action=`` instead (probably with
``functools.partial``).
:param callback_kwargs:
Keyword arguments to the callback callable. Deprecated: please
use ``action=`` instead (probably with ``functools.partial``).
Any action allowed by :mod:`argparse`.
The following parameters are for Flake8's option handling alone.
@ -188,37 +117,6 @@ class Option:
):
short_option_name, long_option_name = _ARG.NO, short_option_name
# optparse -> argparse `%default` => `%(default)s`
if help is not _ARG.NO and "%default" in help:
LOG.warning(
"option %s: please update `help=` text to use %%(default)s "
"instead of %%default -- this will be an error in the future",
long_option_name,
)
help = help.replace("%default", "%(default)s")
# optparse -> argparse for `callback`
if action == "callback":
LOG.warning(
"option %s: please update from optparse `action='callback'` "
"to argparse action classes -- this will be an error in the "
"future",
long_option_name,
)
action = _CallbackAction
if type is _ARG.NO:
nargs = 0
# optparse -> argparse for `type`
if isinstance(type, str):
LOG.warning(
"option %s: please update from optparse string `type=` to "
"argparse callable `type=` -- this will be an error in the "
"future",
long_option_name,
)
type = _optparse_callable_map[type]
# flake8 special type normalization
if comma_separated_list or normalize_paths:
type = functools.partial(
@ -241,13 +139,10 @@ class Option:
self.nargs = nargs
self.const = const
self.choices = choices
self.callback = callback
self.callback_args = callback_args
self.callback_kwargs = callback_kwargs
self.help = help
self.metavar = metavar
self.required = required
self.option_kwargs: Dict[str, Union[Any, _ARG]] = {
self.option_kwargs: dict[str, Any | _ARG] = {
"action": self.action,
"default": self.default,
"type": self.type,
@ -255,9 +150,6 @@ class Option:
"nargs": self.nargs,
"const": self.const,
"choices": self.choices,
"callback": self.callback,
"callback_args": self.callback_args,
"callback_kwargs": self.callback_kwargs,
"help": self.help,
"metavar": self.metavar,
"required": self.required,
@ -268,7 +160,7 @@ class Option:
self.comma_separated_list = comma_separated_list
self.normalize_paths = normalize_paths
self.config_name: Optional[str] = None
self.config_name: str | None = None
if parse_from_config:
if long_option_name is _ARG.NO:
raise ValueError(
@ -280,7 +172,7 @@ class Option:
self._opt = None
@property
def filtered_option_kwargs(self) -> Dict[str, Any]:
def filtered_option_kwargs(self) -> dict[str, Any]:
"""Return any actually-specified arguments."""
return {
k: v for k, v in self.option_kwargs.items() if v is not _ARG.NO
@ -307,7 +199,7 @@ class Option:
return value
def to_argparse(self) -> Tuple[List[str], Dict[str, Any]]:
def to_argparse(self) -> tuple[list[str], dict[str, Any]]:
"""Convert a Flake8 Option to argparse ``add_argument`` arguments."""
return self.option_args, self.filtered_option_kwargs
@ -320,20 +212,11 @@ class OptionManager:
*,
version: str,
plugin_versions: str,
parents: List[argparse.ArgumentParser],
parents: list[argparse.ArgumentParser],
formatter_names: list[str],
) -> None:
"""Initialize an instance of an OptionManager.
:param prog:
Name of the actual program (e.g., flake8).
:param version:
Version string for the program.
:param usage:
Basic usage string used by the OptionParser.
:param parents:
A list of ArgumentParser objects whose arguments should also be
included.
"""
"""Initialize an instance of an OptionManager."""
self.formatter_names = formatter_names
self.parser = argparse.ArgumentParser(
prog="flake8",
usage="%(prog)s [options] file file ...",
@ -350,17 +233,17 @@ class OptionManager:
)
self.parser.add_argument("filenames", nargs="*", metavar="filename")
self.config_options_dict: Dict[str, Option] = {}
self.options: List[Option] = []
self.extended_default_ignore: List[str] = []
self.extended_default_select: List[str] = []
self.config_options_dict: dict[str, Option] = {}
self.options: list[Option] = []
self.extended_default_ignore: list[str] = []
self.extended_default_select: list[str] = []
self._current_group: Optional[argparse._ArgumentGroup] = None
self._current_group: argparse._ArgumentGroup | None = None
# TODO: maybe make this a free function to reduce api surface area
def register_plugins(self, plugins: Plugins) -> None:
"""Register the plugin options (if needed)."""
groups: Dict[str, argparse._ArgumentGroup] = {}
groups: dict[str, argparse._ArgumentGroup] = {}
def _set_group(name: str) -> None:
try:
@ -428,8 +311,8 @@ class OptionManager:
def parse_args(
self,
args: Optional[Sequence[str]] = None,
values: Optional[argparse.Namespace] = None,
args: Sequence[str] | None = None,
values: argparse.Namespace | None = None,
) -> argparse.Namespace:
"""Proxy to calling the OptionParser's parse_args method."""
if values:

View file

@ -0,0 +1,70 @@
"""Procedure for parsing args, config, loading plugins."""
from __future__ import annotations
import argparse
from typing import Sequence
import flake8
from flake8.main import options
from flake8.options import aggregator
from flake8.options import config
from flake8.options import manager
from flake8.plugins import finder
def parse_args(
argv: Sequence[str],
) -> tuple[finder.Plugins, argparse.Namespace]:
"""Procedure for parsing args, config, loading plugins."""
prelim_parser = options.stage1_arg_parser()
args0, rest = prelim_parser.parse_known_args(argv)
# XXX (ericvw): Special case "forwarding" the output file option so
# that it can be reparsed again for the BaseFormatter.filename.
if args0.output_file:
rest.extend(("--output-file", args0.output_file))
flake8.configure_logging(args0.verbose, args0.output_file)
cfg, cfg_dir = config.load_config(
config=args0.config,
extra=args0.append_config,
isolated=args0.isolated,
)
plugin_opts = finder.parse_plugin_options(
cfg,
cfg_dir,
enable_extensions=args0.enable_extensions,
require_plugins=args0.require_plugins,
)
raw_plugins = finder.find_plugins(cfg, plugin_opts)
plugins = finder.load_plugins(raw_plugins, plugin_opts)
option_manager = manager.OptionManager(
version=flake8.__version__,
plugin_versions=plugins.versions_str(),
parents=[prelim_parser],
formatter_names=list(plugins.reporters),
)
options.register_default_options(option_manager)
option_manager.register_plugins(plugins)
opts = aggregator.aggregate_options(option_manager, cfg, cfg_dir, rest)
for loaded in plugins.all_plugins():
parse_options = getattr(loaded.obj, "parse_options", None)
if parse_options is None:
continue
# XXX: ideally we wouldn't have two forms of parse_options
try:
parse_options(
option_manager,
opts,
opts.filenames,
)
except TypeError:
parse_options(opts)
return plugins, opts

View file

@ -1 +1,2 @@
"""Submodule of built-in plugins and plugin managers."""
from __future__ import annotations

View file

@ -1,29 +1,24 @@
"""Functions related to finding and loading plugins."""
from __future__ import annotations
import configparser
import inspect
import itertools
import logging
import re
import sys
from typing import Any
from typing import Dict
from typing import FrozenSet
from typing import Generator
from typing import Iterable
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Tuple
from flake8 import utils
from flake8._compat import importlib_metadata
from flake8.defaults import VALID_CODE_PREFIX
from flake8.exceptions import ExecutionError
from flake8.exceptions import FailedToLoadPlugin
LOG = logging.getLogger(__name__)
VALID_CODE = re.compile("^[A-Z]{1,3}[0-9]{0,3}$", re.ASCII)
FLAKE8_GROUPS = frozenset(("flake8.extension", "flake8.report"))
BANNED_PLUGINS = {
@ -45,7 +40,7 @@ class LoadedPlugin(NamedTuple):
plugin: Plugin
obj: Any
parameters: Dict[str, bool]
parameters: dict[str, bool]
@property
def entry_name(self) -> str:
@ -61,17 +56,17 @@ class LoadedPlugin(NamedTuple):
class Checkers(NamedTuple):
"""Classified plugins needed for checking."""
tree: List[LoadedPlugin]
logical_line: List[LoadedPlugin]
physical_line: List[LoadedPlugin]
tree: list[LoadedPlugin]
logical_line: list[LoadedPlugin]
physical_line: list[LoadedPlugin]
class Plugins(NamedTuple):
"""Classified plugins."""
checkers: Checkers
reporters: Dict[str, LoadedPlugin]
disabled: List[LoadedPlugin]
reporters: dict[str, LoadedPlugin]
disabled: list[LoadedPlugin]
def all_plugins(self) -> Generator[LoadedPlugin, None, None]:
"""Return an iterator over all :class:`LoadedPlugin`s."""
@ -96,12 +91,12 @@ class Plugins(NamedTuple):
class PluginOptions(NamedTuple):
"""Options related to plugin loading."""
local_plugin_paths: Tuple[str, ...]
enable_extensions: FrozenSet[str]
require_plugins: FrozenSet[str]
local_plugin_paths: tuple[str, ...]
enable_extensions: frozenset[str]
require_plugins: frozenset[str]
@classmethod
def blank(cls) -> "PluginOptions":
def blank(cls) -> PluginOptions:
"""Make a blank PluginOptions, mostly used for tests."""
return cls(
local_plugin_paths=(),
@ -113,8 +108,8 @@ class PluginOptions(NamedTuple):
def _parse_option(
cfg: configparser.RawConfigParser,
cfg_opt_name: str,
opt: Optional[str],
) -> List[str]:
opt: str | None,
) -> list[str]:
# specified on commandline: use that
if opt is not None:
return utils.parse_comma_separated_list(opt)
@ -133,8 +128,8 @@ def parse_plugin_options(
cfg: configparser.RawConfigParser,
cfg_dir: str,
*,
enable_extensions: Optional[str],
require_plugins: Optional[str],
enable_extensions: str | None,
require_plugins: str | None,
) -> PluginOptions:
"""Parse plugin loading related options."""
paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
@ -231,8 +226,8 @@ def _find_local_plugins(
def _check_required_plugins(
plugins: List[Plugin],
expected: FrozenSet[str],
plugins: list[Plugin],
expected: frozenset[str],
) -> None:
plugin_names = {
utils.normalize_pypi_name(plugin.package) for plugin in plugins
@ -252,7 +247,7 @@ def _check_required_plugins(
def find_plugins(
cfg: configparser.RawConfigParser,
opts: PluginOptions,
) -> List[Plugin]:
) -> list[Plugin]:
"""Discovers all plugins (but does not load them)."""
ret = [*_find_importlib_plugins(), *_find_local_plugins(cfg)]
@ -264,7 +259,7 @@ def find_plugins(
return ret
def _parameters_for(func: Any) -> Dict[str, bool]:
def _parameters_for(func: Any) -> dict[str, bool]:
"""Return the parameters for the plugin.
This will inspect the plugin and return either the function parameters
@ -305,15 +300,15 @@ def _load_plugin(plugin: Plugin) -> LoadedPlugin:
def _import_plugins(
plugins: List[Plugin],
plugins: list[Plugin],
opts: PluginOptions,
) -> List[LoadedPlugin]:
) -> list[LoadedPlugin]:
sys.path.extend(opts.local_plugin_paths)
return [_load_plugin(p) for p in plugins]
def _classify_plugins(
plugins: List[LoadedPlugin],
plugins: list[LoadedPlugin],
opts: PluginOptions,
) -> Plugins:
tree = []
@ -340,10 +335,10 @@ def _classify_plugins(
raise NotImplementedError(f"what plugin type? {loaded}")
for loaded in itertools.chain(tree, logical_line, physical_line):
if not VALID_CODE.match(loaded.entry_name):
if not VALID_CODE_PREFIX.match(loaded.entry_name):
raise ExecutionError(
f"plugin code for `{loaded.display_name}` does not match "
f"{VALID_CODE.pattern}"
f"{VALID_CODE_PREFIX.pattern}"
)
return Plugins(
@ -358,7 +353,7 @@ def _classify_plugins(
def load_plugins(
plugins: List[Plugin],
plugins: list[Plugin],
opts: PluginOptions,
) -> Plugins:
"""Load and classify all flake8 plugins.

View file

@ -1,8 +1,9 @@
"""Generated using ./bin/gen-pycodestyle-plugin."""
# fmt: off
from __future__ import annotations
from typing import Any
from typing import Generator
from typing import Tuple
from pycodestyle import ambiguous_identifier as _ambiguous_identifier
from pycodestyle import bare_except as _bare_except
@ -60,7 +61,7 @@ def pycodestyle_logical(
previous_unindented_logical_line: Any,
tokens: Any,
verbose: Any,
) -> Generator[Tuple[int, str], None, None]:
) -> Generator[tuple[int, str], None, None]:
"""Run pycodestyle logical checks."""
yield from _ambiguous_identifier(logical_line, tokens)
yield from _bare_except(logical_line, noqa)
@ -104,7 +105,7 @@ def pycodestyle_physical(
noqa: Any,
physical_line: Any,
total_lines: Any,
) -> Generator[Tuple[int, str], None, None]:
) -> Generator[tuple[int, str], None, None]:
"""Run pycodestyle physical checks."""
ret = _maximum_line_length(physical_line, max_line_length, multiline, line_number, noqa) # noqa: E501
if ret is not None:

View file

@ -1,13 +1,12 @@
"""Plugin built-in to Flake8 to treat pyflakes as a plugin."""
from __future__ import annotations
import argparse
import ast
import os
import tokenize
from typing import Any
from typing import Generator
from typing import List
from typing import Tuple
from typing import Type
import pyflakes.checker
@ -68,13 +67,13 @@ class FlakesChecker(pyflakes.checker.Checker):
"""Subclass the Pyflakes checker to conform with the flake8 API."""
with_doctest = False
include_in_doctest: List[str] = []
exclude_from_doctest: List[str] = []
include_in_doctest: list[str] = []
exclude_from_doctest: list[str] = []
def __init__(
self,
tree: ast.AST,
file_tokens: List[tokenize.TokenInfo],
file_tokens: list[tokenize.TokenInfo],
filename: str,
) -> None:
"""Initialize the PyFlakes plugin with an AST tree and filename."""
@ -91,13 +90,13 @@ class FlakesChecker(pyflakes.checker.Checker):
for exclude in self.exclude_from_doctest:
if exclude != "" and filename.startswith(exclude):
with_doctest = False
overlaped_by = [
overlapped_by = [
include
for include in included_by
if include.startswith(exclude)
]
if overlaped_by:
if overlapped_by:
with_doctest = True
super().__init__(
@ -180,7 +179,7 @@ class FlakesChecker(pyflakes.checker.Checker):
f"both for doctesting."
)
def run(self) -> Generator[Tuple[int, int, str, Type[Any]], None, None]:
def run(self) -> Generator[tuple[int, int, str, type[Any]], None, None]:
"""Run the plugin."""
for message in self.messages:
col = getattr(message, "col", 0)

View file

@ -1,7 +1,8 @@
"""Functions for construcing the requested report plugin."""
"""Functions for constructing the requested report plugin."""
from __future__ import annotations
import argparse
import logging
from typing import Dict
from flake8.formatting.base import BaseFormatter
from flake8.plugins.finder import LoadedPlugin
@ -10,7 +11,7 @@ LOG = logging.getLogger(__name__)
def make(
reporters: Dict[str, LoadedPlugin],
reporters: dict[str, LoadedPlugin],
options: argparse.Namespace,
) -> BaseFormatter:
"""Make the formatter from the requested user options.

View file

@ -1,14 +1,14 @@
"""Module containing our file processor that tokenizes a file for checks."""
from __future__ import annotations
import argparse
import ast
import contextlib
import logging
import tokenize
from typing import Any
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple
from flake8 import defaults
@ -27,7 +27,7 @@ _Logical = Tuple[List[str], List[str], _LogicalMapping]
class FileProcessor:
"""Processes a file and holdes state.
"""Processes a file and holds state.
This processes a file by generating tokens, logical and physical lines,
and AST trees. This also provides a way of passing state about the file
@ -61,9 +61,9 @@ class FileProcessor:
self,
filename: str,
options: argparse.Namespace,
lines: Optional[List[str]] = None,
lines: list[str] | None = None,
) -> None:
"""Initialice our file processor.
"""Initialize our file processor.
:param filename: Name of the file to process
"""
@ -78,13 +78,13 @@ class FileProcessor:
#: Number of blank lines
self.blank_lines = 0
#: Checker states for each plugin?
self._checker_states: Dict[str, Dict[Any, Any]] = {}
self._checker_states: dict[str, dict[Any, Any]] = {}
#: Current checker state
self.checker_state: Dict[Any, Any] = {}
self.checker_state: dict[Any, Any] = {}
#: User provided option for hang closing
self.hang_closing = options.hang_closing
#: Character used for indentation
self.indent_char: Optional[str] = None
self.indent_char: str | None = None
#: Current level of indentation
self.indent_level = 0
#: Number of spaces used for indentation
@ -106,19 +106,19 @@ class FileProcessor:
#: Previous unindented (i.e. top-level) logical line
self.previous_unindented_logical_line = ""
#: Current set of tokens
self.tokens: List[tokenize.TokenInfo] = []
self.tokens: list[tokenize.TokenInfo] = []
#: Total number of lines in the file
self.total_lines = len(self.lines)
#: Verbosity level of Flake8
self.verbose = options.verbose
#: Statistics dictionary
self.statistics = {"logical lines": 0}
self._file_tokens: Optional[List[tokenize.TokenInfo]] = None
self._file_tokens: list[tokenize.TokenInfo] | None = None
# map from line number to the line we'll search for `noqa` in
self._noqa_line_mapping: Optional[Dict[int, str]] = None
self._noqa_line_mapping: dict[int, str] | None = None
@property
def file_tokens(self) -> List[tokenize.TokenInfo]:
def file_tokens(self) -> list[tokenize.TokenInfo]:
"""Return the complete set of tokens for a file."""
if self._file_tokens is None:
line_iter = iter(self.lines)
@ -217,7 +217,7 @@ class FileProcessor:
"""Build an abstract syntax tree from the list of lines."""
return ast.parse("".join(self.lines))
def build_logical_line(self) -> Tuple[str, str, _LogicalMapping]:
def build_logical_line(self) -> tuple[str, str, _LogicalMapping]:
"""Build a logical line from the current tokens list."""
comments, logical, mapping_list = self.build_logical_line_tokens()
joined_comments = "".join(comments)
@ -240,9 +240,9 @@ class FileProcessor:
def keyword_arguments_for(
self,
parameters: Dict[str, bool],
arguments: Dict[str, Any],
) -> Dict[str, Any]:
parameters: dict[str, bool],
arguments: dict[str, Any],
) -> dict[str, Any]:
"""Generate the keyword arguments for a list of parameters."""
ret = {}
for param, required in parameters.items():
@ -269,12 +269,12 @@ class FileProcessor:
self.tokens.append(token)
yield token
def _noqa_line_range(self, min_line: int, max_line: int) -> Dict[int, str]:
def _noqa_line_range(self, min_line: int, max_line: int) -> dict[int, str]:
line_range = range(min_line, max_line + 1)
joined = "".join(self.lines[min_line - 1 : max_line])
return dict.fromkeys(line_range, joined)
def noqa_line_for(self, line_number: int) -> Optional[str]:
def noqa_line_for(self, line_number: int) -> str | None:
"""Retrieve the line which will be used to determine noqa."""
if self._noqa_line_mapping is None:
try:
@ -324,16 +324,16 @@ class FileProcessor:
self.indent_char = line[0]
return line
def read_lines(self) -> List[str]:
def read_lines(self) -> list[str]:
"""Read the lines for this file checker."""
if self.filename is None or self.filename == "-":
if self.filename == "-":
self.filename = self.options.stdin_display_name or "stdin"
lines = self.read_lines_from_stdin()
else:
lines = self.read_lines_from_filename()
return lines
def read_lines_from_filename(self) -> List[str]:
def read_lines_from_filename(self) -> list[str]:
"""Read the lines for a file."""
try:
with tokenize.open(self.filename) as fd:
@ -344,7 +344,7 @@ class FileProcessor:
with open(self.filename, encoding="latin-1") as fd:
return fd.readlines()
def read_lines_from_stdin(self) -> List[str]:
def read_lines_from_stdin(self) -> list[str]:
"""Read the lines from standard in."""
return utils.stdin_get_lines()

View file

@ -1,9 +1,8 @@
"""Statistic collection logic for Flake8."""
from typing import Dict
from __future__ import annotations
from typing import Generator
from typing import List
from typing import NamedTuple
from typing import Optional
from flake8.violation import Violation
@ -13,9 +12,9 @@ class Statistics:
def __init__(self) -> None:
"""Initialize the underlying dictionary for our statistics."""
self._store: Dict[Key, "Statistic"] = {}
self._store: dict[Key, Statistic] = {}
def error_codes(self) -> List[str]:
def error_codes(self) -> list[str]:
"""Return all unique error codes stored.
:returns:
@ -23,7 +22,7 @@ class Statistics:
"""
return sorted({key.code for key in self._store})
def record(self, error: "Violation") -> None:
def record(self, error: Violation) -> None:
"""Add the fact that the error was seen in the file.
:param error:
@ -36,8 +35,8 @@ class Statistics:
self._store[key].increment()
def statistics_for(
self, prefix: str, filename: Optional[str] = None
) -> Generator["Statistic", None, None]:
self, prefix: str, filename: str | None = None
) -> Generator[Statistic, None, None]:
"""Generate statistics for the prefix and filename.
If you have a :class:`Statistics` object that has recorded errors,
@ -79,11 +78,11 @@ class Key(NamedTuple):
code: str
@classmethod
def create_from(cls, error: "Violation") -> "Key":
def create_from(cls, error: Violation) -> Key:
"""Create a Key from :class:`flake8.violation.Violation`."""
return cls(filename=error.filename, code=error.code)
def matches(self, prefix: str, filename: Optional[str]) -> bool:
def matches(self, prefix: str, filename: str | None) -> bool:
"""Determine if this key matches some constraints.
:param prefix:
@ -118,7 +117,7 @@ class Statistic:
self.count = count
@classmethod
def create_from(cls, error: "Violation") -> "Statistic":
def create_from(cls, error: Violation) -> Statistic:
"""Create a Statistic from a :class:`flake8.violation.Violation`."""
return cls(
error_code=error.code,

View file

@ -1,19 +1,14 @@
"""Implementation of the StyleGuide used by Flake8."""
from __future__ import annotations
import argparse
import contextlib
import copy
import enum
import functools
import itertools
import logging
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
from flake8 import defaults
from flake8 import statistics
@ -49,20 +44,20 @@ class Decision(enum.Enum):
def _explicitly_chosen(
*,
option: Optional[List[str]],
extend: Optional[List[str]],
) -> Tuple[str, ...]:
option: list[str] | None,
extend: list[str] | None,
) -> tuple[str, ...]:
ret = [*(option or []), *(extend or [])]
return tuple(sorted(ret, reverse=True))
def _select_ignore(
*,
option: Optional[List[str]],
default: Tuple[str, ...],
extended_default: List[str],
extend: Optional[List[str]],
) -> Tuple[str, ...]:
option: list[str] | None,
default: tuple[str, ...],
extended_default: list[str],
extend: list[str] | None,
) -> tuple[str, ...]:
# option was explicitly set, ignore the default and extended default
if option is not None:
ret = [*option, *(extend or [])]
@ -80,7 +75,7 @@ class DecisionEngine:
def __init__(self, options: argparse.Namespace) -> None:
"""Initialize the engine."""
self.cache: Dict[str, Decision] = {}
self.cache: dict[str, Decision] = {}
self.selected_explicitly = _explicitly_chosen(
option=options.select,
@ -104,7 +99,7 @@ class DecisionEngine:
extend=options.extend_ignore,
)
def was_selected(self, code: str) -> Union[Selected, Ignored]:
def was_selected(self, code: str) -> Selected | Ignored:
"""Determine if the code has been selected by the user.
:param code: The code for the check that has been run.
@ -122,7 +117,7 @@ class DecisionEngine:
else:
return Ignored.Implicitly
def was_ignored(self, code: str) -> Union[Selected, Ignored]:
def was_ignored(self, code: str) -> Selected | Ignored:
"""Determine if the code has been ignored by the user.
:param code:
@ -211,7 +206,7 @@ class StyleGuideManager:
self,
options: argparse.Namespace,
formatter: base_formatter.BaseFormatter,
decider: Optional[DecisionEngine] = None,
decider: DecisionEngine | None = None,
) -> None:
"""Initialize our StyleGuide.
@ -221,16 +216,14 @@ class StyleGuideManager:
self.formatter = formatter
self.stats = statistics.Statistics()
self.decider = decider or DecisionEngine(options)
self.style_guides: List[StyleGuide] = []
self.style_guides: list[StyleGuide] = []
self.default_style_guide = StyleGuide(
options, formatter, self.stats, decider=decider
)
self.style_guides = list(
itertools.chain(
[self.default_style_guide],
self.populate_style_guides_with(options),
)
)
self.style_guides = [
self.default_style_guide,
*self.populate_style_guides_with(options),
]
self.style_guide_for = functools.lru_cache(maxsize=None)(
self._style_guide_for
@ -238,7 +231,7 @@ class StyleGuideManager:
def populate_style_guides_with(
self, options: argparse.Namespace
) -> Generator["StyleGuide", None, None]:
) -> Generator[StyleGuide, None, None]:
"""Generate style guides from the per-file-ignores option.
:param options:
@ -252,7 +245,7 @@ class StyleGuideManager:
filename=filename, extend_ignore_with=violations
)
def _style_guide_for(self, filename: str) -> "StyleGuide":
def _style_guide_for(self, filename: str) -> StyleGuide:
"""Find the StyleGuide for the filename in particular."""
return max(
(g for g in self.style_guides if g.applies_to(filename)),
@ -262,7 +255,7 @@ class StyleGuideManager:
@contextlib.contextmanager
def processing_file(
self, filename: str
) -> Generator["StyleGuide", None, None]:
) -> Generator[StyleGuide, None, None]:
"""Record the fact that we're processing the file's results."""
guide = self.style_guide_for(filename)
with guide.processing_file(filename):
@ -275,7 +268,7 @@ class StyleGuideManager:
line_number: int,
column_number: int,
text: str,
physical_line: Optional[str] = None,
physical_line: str | None = None,
) -> int:
"""Handle an error reported by a check.
@ -302,18 +295,6 @@ class StyleGuideManager:
code, filename, line_number, column_number, text, physical_line
)
def add_diff_ranges(self, diffinfo: Dict[str, Set[int]]) -> None:
"""Update the StyleGuides to filter out information not in the diff.
This provides information to the underlying StyleGuides so that only
the errors in the line number ranges are reported.
:param diffinfo:
Dictionary mapping filenames to sets of line number ranges.
"""
for guide in self.style_guides:
guide.add_diff_ranges(diffinfo)
class StyleGuide:
"""Manage a Flake8 user's style guide."""
@ -323,8 +304,8 @@ class StyleGuide:
options: argparse.Namespace,
formatter: base_formatter.BaseFormatter,
stats: statistics.Statistics,
filename: Optional[str] = None,
decider: Optional[DecisionEngine] = None,
filename: str | None = None,
decider: DecisionEngine | None = None,
):
"""Initialize our StyleGuide.
@ -337,7 +318,6 @@ class StyleGuide:
self.filename = filename
if self.filename:
self.filename = utils.normalize_path(self.filename)
self._parsed_diff: Dict[str, Set[int]] = {}
def __repr__(self) -> str:
"""Make it easier to debug which StyleGuide we're using."""
@ -345,9 +325,9 @@ class StyleGuide:
def copy(
self,
filename: Optional[str] = None,
extend_ignore_with: Optional[Sequence[str]] = None,
) -> "StyleGuide":
filename: str | None = None,
extend_ignore_with: Sequence[str] | None = None,
) -> StyleGuide:
"""Create a copy of this style guide with different values."""
filename = filename or self.filename
options = copy.deepcopy(self.options)
@ -360,7 +340,7 @@ class StyleGuide:
@contextlib.contextmanager
def processing_file(
self, filename: str
) -> Generator["StyleGuide", None, None]:
) -> Generator[StyleGuide, None, None]:
"""Record the fact that we're processing the file's results."""
self.formatter.beginning(filename)
yield self
@ -405,7 +385,7 @@ class StyleGuide:
line_number: int,
column_number: int,
text: str,
physical_line: Optional[str] = None,
physical_line: str | None = None,
) -> int:
"""Handle an error reported by a check.
@ -444,20 +424,8 @@ class StyleGuide:
self.should_report_error(error.code) is Decision.Selected
)
is_not_inline_ignored = error.is_inline_ignored(disable_noqa) is False
is_included_in_diff = error.is_in(self._parsed_diff)
if error_is_selected and is_not_inline_ignored and is_included_in_diff:
if error_is_selected and is_not_inline_ignored:
self.formatter.handle(error)
self.stats.record(error)
return 1
return 0
def add_diff_ranges(self, diffinfo: Dict[str, Set[int]]) -> None:
"""Update the StyleGuide to filter out information not in the diff.
This provides information to the StyleGuide so that only the errors
in the line number ranges are reported.
:param diffinfo:
Dictionary mapping filenames to sets of line number ranges.
"""
self._parsed_diff = diffinfo

View file

@ -1,5 +1,6 @@
"""Utility methods for flake8."""
import collections
from __future__ import annotations
import fnmatch as _fnmatch
import functools
import io
@ -10,19 +11,12 @@ import re
import sys
import textwrap
import tokenize
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Pattern
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
from flake8 import exceptions
DIFF_HUNK_REGEXP = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$")
COMMA_SEPARATED_LIST_RE = re.compile(r"[,\s]")
LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]")
NORMALIZE_PACKAGE_NAME_RE = re.compile(r"[-_.]+")
@ -30,7 +24,7 @@ NORMALIZE_PACKAGE_NAME_RE = re.compile(r"[-_.]+")
def parse_comma_separated_list(
value: str, regexp: Pattern[str] = COMMA_SEPARATED_LIST_RE
) -> List[str]:
) -> list[str]:
"""Parse a comma-separated list.
:param value:
@ -64,7 +58,7 @@ _FILE_LIST_TOKEN_TYPES = [
]
def _tokenize_files_to_codes_mapping(value: str) -> List[_Token]:
def _tokenize_files_to_codes_mapping(value: str) -> list[_Token]:
tokens = []
i = 0
while i < len(value):
@ -82,8 +76,8 @@ def _tokenize_files_to_codes_mapping(value: str) -> List[_Token]:
def parse_files_to_codes_mapping( # noqa: C901
value_: Union[Sequence[str], str]
) -> List[Tuple[str, List[str]]]:
value_: Sequence[str] | str,
) -> list[tuple[str, list[str]]]:
"""Parse a files-to-codes mapping.
A files-to-codes mapping a sequence of values specified as
@ -97,15 +91,15 @@ def parse_files_to_codes_mapping( # noqa: C901
else:
value = value_
ret: List[Tuple[str, List[str]]] = []
ret: list[tuple[str, list[str]]] = []
if not value.strip():
return ret
class State:
seen_sep = True
seen_colon = False
filenames: List[str] = []
codes: List[str] = []
filenames: list[str] = []
codes: list[str] = []
def _reset() -> None:
if State.codes:
@ -157,7 +151,7 @@ def parse_files_to_codes_mapping( # noqa: C901
def normalize_paths(
paths: Sequence[str], parent: str = os.curdir
) -> List[str]:
) -> list[str]:
"""Normalize a list of paths relative to a parent directory.
:returns:
@ -201,77 +195,12 @@ def stdin_get_value() -> str:
return stdin_value.decode("utf-8")
def stdin_get_lines() -> List[str]:
def stdin_get_lines() -> list[str]:
"""Return lines of stdin split according to file splitting."""
return list(io.StringIO(stdin_get_value()))
def parse_unified_diff(diff: Optional[str] = None) -> Dict[str, Set[int]]:
"""Parse the unified diff passed on stdin.
:returns:
dictionary mapping file names to sets of line numbers
"""
# Allow us to not have to patch out stdin_get_value
if diff is None:
diff = stdin_get_value()
number_of_rows = None
current_path = None
parsed_paths: Dict[str, Set[int]] = collections.defaultdict(set)
for line in diff.splitlines():
if number_of_rows:
if not line or line[0] != "-":
number_of_rows -= 1
# We're in the part of the diff that has lines starting with +, -,
# and ' ' to show context and the changes made. We skip these
# because the information we care about is the filename and the
# range within it.
# When number_of_rows reaches 0, we will once again start
# searching for filenames and ranges.
continue
# NOTE(sigmavirus24): Diffs that we support look roughly like:
# diff a/file.py b/file.py
# ...
# --- a/file.py
# +++ b/file.py
# Below we're looking for that last line. Every diff tool that
# gives us this output may have additional information after
# ``b/file.py`` which it will separate with a \t, e.g.,
# +++ b/file.py\t100644
# Which is an example that has the new file permissions/mode.
# In this case we only care about the file name.
if line[:3] == "+++":
current_path = line[4:].split("\t", 1)[0]
# NOTE(sigmavirus24): This check is for diff output from git.
if current_path[:2] == "b/":
current_path = current_path[2:]
# We don't need to do anything else. We have set up our local
# ``current_path`` variable. We can skip the rest of this loop.
# The next line we will see will give us the hung information
# which is in the next section of logic.
continue
hunk_match = DIFF_HUNK_REGEXP.match(line)
# NOTE(sigmavirus24): pep8/pycodestyle check for:
# line[:3] == '@@ '
# But the DIFF_HUNK_REGEXP enforces that the line start with that
# So we can more simply check for a match instead of slicing and
# comparing.
if hunk_match:
(row, number_of_rows) = (
1 if not group else int(group) for group in hunk_match.groups()
)
assert current_path is not None
parsed_paths[current_path].update(range(row, row + number_of_rows))
# We have now parsed our diff into a dictionary that looks like:
# {'file.py': set(range(10, 16), range(18, 20)), ...}
return parsed_paths
def is_using_stdin(paths: List[str]) -> bool:
def is_using_stdin(paths: list[str]) -> bool:
"""Determine if we're going to read from stdin.
:param paths:

View file

@ -1,12 +1,11 @@
"""Contains the Violation error class used internally."""
from __future__ import annotations
import functools
import linecache
import logging
from typing import Dict
from typing import Match
from typing import NamedTuple
from typing import Optional
from typing import Set
from flake8 import defaults
from flake8 import utils
@ -16,7 +15,7 @@ LOG = logging.getLogger(__name__)
@functools.lru_cache(maxsize=512)
def _find_noqa(physical_line: str) -> Optional[Match[str]]:
def _find_noqa(physical_line: str) -> Match[str] | None:
return defaults.NOQA_INLINE_REGEXP.search(physical_line)
@ -28,7 +27,7 @@ class Violation(NamedTuple):
line_number: int
column_number: int
text: str
physical_line: Optional[str]
physical_line: str | None
def is_inline_ignored(self, disable_noqa: bool) -> bool:
"""Determine if a comment has been added to ignore this line.
@ -68,36 +67,3 @@ class Violation(NamedTuple):
"%r is not ignored inline with ``# noqa: %s``", self, codes_str
)
return False
def is_in(self, diff: Dict[str, Set[int]]) -> bool:
"""Determine if the violation is included in a diff's line ranges.
This function relies on the parsed data added via
:meth:`~StyleGuide.add_diff_ranges`. If that has not been called and
we are not evaluating files in a diff, then this will always return
True. If there are diff ranges, then this will return True if the
line number in the error falls inside one of the ranges for the file
(and assuming the file is part of the diff data). If there are diff
ranges, this will return False if the file is not part of the diff
data or the line number of the error is not in any of the ranges of
the diff.
:returns:
True if there is no diff or if the error is in the diff's line
number ranges. False if the error's line number falls outside
the diff's line number ranges.
"""
if not diff:
return True
# NOTE(sigmavirus24): The parsed diff will be a defaultdict with
# a set as the default value (if we have received it from
# flake8.utils.parse_unified_diff). In that case ranges below
# could be an empty set (which is False-y) or if someone else
# is using this API, it could be None. If we could guarantee one
# or the other, we would check for it more explicitly.
line_numbers = diff.get(self.filename)
if not line_numbers:
return False
return self.line_number in line_numbers

View file

@ -1 +1,2 @@
"""This is here because mypy doesn't understand PEP 420."""
from __future__ import annotations

View file

@ -1,4 +1,6 @@
"""Test configuration for py.test."""
from __future__ import annotations
import sys
import flake8

View file

@ -1,130 +0,0 @@
diff --git a/flake8/utils.py b/flake8/utils.py
index f6ce384..7cd12b0 100644
--- a/flake8/utils.py
+++ b/flake8/utils.py
@@ -75,8 +75,8 @@ def stdin_get_value():
return cached_value.getvalue()
-def parse_unified_diff():
- # type: () -> List[str]
+def parse_unified_diff(diff=None):
+ # type: (str) -> List[str]
"""Parse the unified diff passed on stdin.
:returns:
@@ -84,7 +84,10 @@ def parse_unified_diff():
:rtype:
dict
"""
- diff = stdin_get_value()
+ # Allow us to not have to patch out stdin_get_value
+ if diff is None:
+ diff = stdin_get_value()
+
number_of_rows = None
current_path = None
parsed_paths = collections.defaultdict(set)
diff --git a/tests/fixtures/diffs/single_file_diff b/tests/fixtures/diffs/single_file_diff
new file mode 100644
index 0000000..77ca534
--- /dev/null
+++ b/tests/fixtures/diffs/single_file_diff
@@ -0,0 +1,27 @@
+diff --git a/flake8/utils.py b/flake8/utils.py
+index f6ce384..7cd12b0 100644
+--- a/flake8/utils.py
++++ b/flake8/utils.py
+@@ -75,8 +75,8 @@ def stdin_get_value():
+ return cached_value.getvalue()
+
+
+-def parse_unified_diff():
+- # type: () -> List[str]
++def parse_unified_diff(diff=None):
++ # type: (str) -> List[str]
+ """Parse the unified diff passed on stdin.
+
+ :returns:
+@@ -84,7 +84,10 @@ def parse_unified_diff():
+ :rtype:
+ dict
+ """
+- diff = stdin_get_value()
++ # Allow us to not have to patch out stdin_get_value
++ if diff is None:
++ diff = stdin_get_value()
++
+ number_of_rows = None
+ current_path = None
+ parsed_paths = collections.defaultdict(set)
diff --git a/tests/fixtures/diffs/two_file_diff b/tests/fixtures/diffs/two_file_diff
new file mode 100644
index 0000000..5bd35cd
--- /dev/null
+++ b/tests/fixtures/diffs/two_file_diff
@@ -0,0 +1,45 @@
+diff --git a/flake8/utils.py b/flake8/utils.py
+index f6ce384..7cd12b0 100644
+--- a/flake8/utils.py
++++ b/flake8/utils.py
+@@ -75,8 +75,8 @@ def stdin_get_value():
+ return cached_value.getvalue()
+
+
+-def parse_unified_diff():
+- # type: () -> List[str]
++def parse_unified_diff(diff=None):
++ # type: (str) -> List[str]
+ """Parse the unified diff passed on stdin.
+
+ :returns:
+@@ -84,7 +84,10 @@ def parse_unified_diff():
+ :rtype:
+ dict
+ """
+- diff = stdin_get_value()
++ # Allow us to not have to patch out stdin_get_value
++ if diff is None:
++ diff = stdin_get_value()
++
+ number_of_rows = None
+ current_path = None
+ parsed_paths = collections.defaultdict(set)
+diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
+index d69d939..21482ce 100644
+--- a/tests/unit/test_utils.py
++++ b/tests/unit/test_utils.py
+@@ -115,3 +115,13 @@ def test_parameters_for_function_plugin():
+ plugin = plugin_manager.Plugin('plugin-name', object())
+ plugin._plugin = fake_plugin
+ assert utils.parameters_for(plugin) == ['physical_line', 'self', 'tree']
++
++
++def read_diff_file(filename):
++ """Read the diff file in its entirety."""
++ with open(filename, 'r') as fd:
++ content = fd.read()
++ return content
++
++
++SINGLE_FILE_DIFF = read_diff_file('tests/fixtures/diffs/single_file_diff')
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index d69d939..1461369 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -115,3 +115,14 @@ def test_parameters_for_function_plugin():
plugin = plugin_manager.Plugin('plugin-name', object())
plugin._plugin = fake_plugin
assert utils.parameters_for(plugin) == ['physical_line', 'self', 'tree']
+
+
+def read_diff_file(filename):
+ """Read the diff file in its entirety."""
+ with open(filename, 'r') as fd:
+ content = fd.read()
+ return content
+
+
+SINGLE_FILE_DIFF = read_diff_file('tests/fixtures/diffs/single_file_diff')
+TWO_FILE_DIFF = read_diff_file('tests/fixtures/diffs/two_file_diff')

View file

@ -1,27 +0,0 @@
diff --git a/flake8/utils.py b/flake8/utils.py
index f6ce384..7cd12b0 100644
--- a/flake8/utils.py
+++ b/flake8/utils.py
@@ -75,8 +75,8 @@ def stdin_get_value():
return cached_value.getvalue()
-def parse_unified_diff():
- # type: () -> List[str]
+def parse_unified_diff(diff=None):
+ # type: (str) -> List[str]
"""Parse the unified diff passed on stdin.
:returns:
@@ -84,7 +84,10 @@ def parse_unified_diff():
:rtype:
dict
"""
- diff = stdin_get_value()
+ # Allow us to not have to patch out stdin_get_value
+ if diff is None:
+ diff = stdin_get_value()
+
number_of_rows = None
current_path = None
parsed_paths = collections.defaultdict(set)

View file

@ -1,45 +0,0 @@
diff --git a/flake8/utils.py b/flake8/utils.py
index f6ce384..7cd12b0 100644
--- a/flake8/utils.py
+++ b/flake8/utils.py
@@ -75,8 +75,8 @@ def stdin_get_value():
return cached_value.getvalue()
-def parse_unified_diff():
- # type: () -> List[str]
+def parse_unified_diff(diff=None):
+ # type: (str) -> List[str]
"""Parse the unified diff passed on stdin.
:returns:
@@ -84,7 +84,10 @@ def parse_unified_diff():
:rtype:
dict
"""
- diff = stdin_get_value()
+ # Allow us to not have to patch out stdin_get_value
+ if diff is None:
+ diff = stdin_get_value()
+
number_of_rows = None
current_path = None
parsed_paths = collections.defaultdict(set)
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index d69d939..21482ce 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -115,3 +115,13 @@ def test_parameters_for_function_plugin():
plugin = plugin_manager.Plugin('plugin-name', object())
plugin._plugin = fake_plugin
assert utils.parameters_for(plugin) == ['physical_line', 'self', 'tree']
+
+
+def read_diff_file(filename):
+ """Read the diff file in its entirety."""
+ with open(filename, 'r') as fd:
+ content = fd.read()
+ return content
+
+
+SINGLE_FILE_DIFF = read_diff_file('tests/fixtures/diffs/single_file_diff')

View file

@ -1,4 +1,5 @@
"""Module that is off sys.path by default, for testing local-plugin-paths."""
from __future__ import annotations
class ExtensionTestPlugin2:

View file

@ -1,4 +1,6 @@
"""Test aggregation of config files and command-line options."""
from __future__ import annotations
import os
import pytest
@ -16,6 +18,7 @@ def optmanager():
version="3.0.0",
plugin_versions="",
parents=[],
formatter_names=[],
)
options.register_default_options(option_manager)
return option_manager

View file

@ -1,4 +1,6 @@
"""Integration tests for the legacy api."""
from __future__ import annotations
from flake8.api import legacy

View file

@ -1,4 +1,6 @@
"""Integration tests for the checker submodule."""
from __future__ import annotations
import sys
from unittest import mock
@ -264,17 +266,12 @@ def test_report_order(results, expected_order):
# tuples to create the expected result lists from the indexes
expected_results = [results[index] for index in expected_order]
file_checker = mock.Mock(spec=["results", "display_name"])
file_checker.results = results
file_checker.display_name = "placeholder"
style_guide = mock.MagicMock(spec=["options", "processing_file"])
# Create a placeholder manager without arguments or plugins
# Just add one custom file checker which just provides the results
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
manager.checkers = manager._all_checkers = [file_checker]
manager = checker.Manager(style_guide, finder.Checkers([], [], []), [])
manager.results = [("placeholder", results, {})]
# _handle_results is the first place which gets the sorted result
# Should something non-private be mocked instead?
handler = mock.Mock(side_effect=count_side_effect)
@ -293,9 +290,9 @@ def test_acquire_when_multiprocessing_pool_can_initialize():
This simulates the behaviour on most common platforms.
"""
with mock.patch("multiprocessing.Pool") as pool:
result = checker._try_initialize_processpool(2)
result = checker._try_initialize_processpool(2, [])
pool.assert_called_once_with(2, checker._pool_init)
pool.assert_called_once_with(2, checker._mp_init, initargs=([],))
assert result is pool.return_value
@ -312,9 +309,9 @@ def test_acquire_when_multiprocessing_pool_can_not_initialize():
https://github.com/python/cpython/blob/4e02981de0952f54bf87967f8e10d169d6946b40/Lib/multiprocessing/synchronize.py#L30-L33
"""
with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool:
result = checker._try_initialize_processpool(2)
result = checker._try_initialize_processpool(2, [])
pool.assert_called_once_with(2, checker._pool_init)
pool.assert_called_once_with(2, checker._mp_init, initargs=([],))
assert result is None

View file

@ -1,4 +1,6 @@
"""Integration tests for the main entrypoint of flake8."""
from __future__ import annotations
import json
import os
import sys
@ -11,42 +13,6 @@ from flake8.main import cli
from flake8.options import config
def test_diff_option(tmpdir, capsys):
"""Ensure that `flake8 --diff` works."""
t_py_contents = """\
import os
import sys # unused but not part of diff
print('(to avoid trailing whitespace in test)')
print('(to avoid trailing whitespace in test)')
print(os.path.join('foo', 'bar'))
y # part of the diff and an error
"""
diff = """\
diff --git a/t.py b/t.py
index d64ac39..7d943de 100644
--- a/t.py
+++ b/t.py
@@ -4,3 +4,5 @@ import sys # unused but not part of diff
print('(to avoid trailing whitespace in test)')
print('(to avoid trailing whitespace in test)')
print(os.path.join('foo', 'bar'))
+
+y # part of the diff and an error
"""
with mock.patch.object(utils, "stdin_get_value", return_value=diff):
with tmpdir.as_cwd():
tmpdir.join("t.py").write(t_py_contents)
assert cli.main(["--diff"]) == 1
out, err = capsys.readouterr()
assert out == "t.py:8:1: F821 undefined name 'y'\n"
assert err == ""
def test_form_feed_line_split(tmpdir, capsys):
"""Test that form feed is treated the same for stdin."""
src = "x=1\n\f\ny=1\n"
@ -132,6 +98,26 @@ t.py:1:1: F401 'os' imported but unused
assert err == ""
def test_errors_sorted(tmpdir, capsys):
with tmpdir.as_cwd():
for c in "abcde":
tmpdir.join(f"{c}.py").write("import os\n")
assert cli.main(["./"]) == 1
# file traversal was done in inode-order before
# this uses a significant number of files such that it's unlikely to pass
expected = """\
./a.py:1:1: F401 'os' imported but unused
./b.py:1:1: F401 'os' imported but unused
./c.py:1:1: F401 'os' imported but unused
./d.py:1:1: F401 'os' imported but unused
./e.py:1:1: F401 'os' imported but unused
"""
out, err = capsys.readouterr()
assert out == expected
assert err == ""
def test_extend_exclude(tmpdir, capsys):
"""Ensure that `flake8 --extend-exclude` works."""
for d in ["project", "vendor", "legacy", ".git", ".tox", ".hg"]:
@ -404,3 +390,13 @@ The specified config file does not exist: missing.cfg
out, err = capsys.readouterr()
assert out == expected
assert err == ""
def test_format_option_help(capsys):
"""Test that help displays list of available formatters."""
with pytest.raises(SystemExit):
cli.main(["--help"])
out, err = capsys.readouterr()
assert "(default, pylint, quiet-filename, quiet-nothing)" in out
assert err == ""

View file

@ -1,4 +1,6 @@
"""Integration tests for plugin loading."""
from __future__ import annotations
import pytest
from flake8.main.cli import main
@ -98,6 +100,7 @@ def test_local_plugin_can_add_option(local_config):
version="123",
plugin_versions="",
parents=[stage1_parser],
formatter_names=[],
)
register_default_options(option_manager)
option_manager.register_plugins(loaded_plugins)

View file

@ -1,4 +1,6 @@
"""Shared fixtures between unit tests."""
from __future__ import annotations
import argparse
import pytest

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import configparser
import sys
from unittest import mock
@ -29,37 +31,6 @@ def _loaded(plugin=None, obj=None, parameters=None):
return finder.LoadedPlugin(plugin, obj, parameters)
@pytest.mark.parametrize(
"s",
(
"E",
"E1",
"E123",
"ABC",
"ABC1",
"ABC123",
),
)
def test_valid_plugin_prefixes(s):
assert finder.VALID_CODE.match(s)
@pytest.mark.parametrize(
"s",
(
"",
"A1234",
"ABCD",
"abc",
"a-b",
"",
"A𝟗",
),
)
def test_invalid_plugin_prefixes(s):
assert finder.VALID_CODE.match(s) is None
def test_loaded_plugin_entry_name_vs_display_name():
loaded = _loaded(_plugin(package="package-name", ep=_ep(name="Q")))
assert loaded.entry_name == "Q"

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import importlib.machinery
import importlib.util
import os.path

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import argparse
import pytest

View file

@ -1,4 +1,6 @@
"""Tests for the Application class."""
from __future__ import annotations
import argparse
import pytest

View file

@ -1,4 +1,6 @@
"""Tests for the BaseFormatter object."""
from __future__ import annotations
import argparse
import sys
from unittest import mock

View file

@ -1,4 +1,6 @@
"""Tests for the Manager object for FileCheckers."""
from __future__ import annotations
import errno
import multiprocessing
from unittest import mock
@ -12,20 +14,15 @@ from flake8.plugins import finder
def style_guide_mock():
"""Create a mock StyleGuide object."""
return mock.MagicMock(
**{
"options.diff": False,
"options.jobs": JobsArgument("4"),
}
)
return mock.MagicMock(**{"options.jobs": JobsArgument("4")})
def _parallel_checker_manager():
"""Call Manager.run() and return the number of calls to `run_serial`."""
style_guide = style_guide_mock()
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
# multiple checkers is needed for parallel mode
manager.checkers = [mock.Mock(), mock.Mock()]
manager = checker.Manager(style_guide, finder.Checkers([], [], []), [])
# multiple files is needed for parallel mode
manager.filenames = ("file1", "file2")
return manager
@ -39,8 +36,7 @@ def test_oserrors_cause_serial_fall_back():
assert serial.call_count == 1
@mock.patch.object(multiprocessing, "get_start_method", return_value="fork")
def test_oserrors_are_reraised(_):
def test_oserrors_are_reraised():
"""Verify that unexpected OSErrors will cause the Manager to reraise."""
err = OSError(errno.EAGAIN, "Ominous message")
with mock.patch("_multiprocessing.SemLock", side_effect=err):
@ -51,14 +47,6 @@ def test_oserrors_are_reraised(_):
assert serial.call_count == 0
@mock.patch.object(multiprocessing, "get_start_method", return_value="spawn")
def test_multiprocessing_is_disabled(_):
"""Verify not being able to import multiprocessing forces jobs to 0."""
style_guide = style_guide_mock()
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
assert manager.jobs == 0
def test_multiprocessing_cpu_count_not_implemented():
"""Verify that jobs is 0 if cpu_count is unavailable."""
style_guide = style_guide_mock()
@ -69,22 +57,18 @@ def test_multiprocessing_cpu_count_not_implemented():
"cpu_count",
side_effect=NotImplementedError,
):
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
manager = checker.Manager(style_guide, finder.Checkers([], [], []), [])
assert manager.jobs == 0
@mock.patch.object(multiprocessing, "get_start_method", return_value="spawn")
def test_make_checkers(_):
def test_make_checkers():
"""Verify that we create a list of FileChecker instances."""
style_guide = style_guide_mock()
style_guide.options.filenames = ["file1", "file2"]
manager = checker.Manager(style_guide, finder.Checkers([], [], []))
manager = checker.Manager(style_guide, finder.Checkers([], [], []), [])
with mock.patch("flake8.utils.fnmatch", return_value=True):
with mock.patch("flake8.processor.FileProcessor"):
manager.make_checkers(["file1", "file2"])
manager.start()
assert manager._all_checkers
for file_checker in manager._all_checkers:
assert file_checker.filename in style_guide.options.filenames
assert not manager.checkers # the files don't exist
assert manager.filenames == ("file1", "file2")

View file

@ -1,3 +1,5 @@
from __future__ import annotations
from unittest import mock
from flake8._compat import importlib_metadata

View file

@ -1,4 +1,6 @@
"""Tests for the flake8.style_guide.DecisionEngine class."""
from __future__ import annotations
import argparse
import pytest

View file

@ -0,0 +1,36 @@
from __future__ import annotations
import pytest
from flake8.defaults import VALID_CODE_PREFIX
@pytest.mark.parametrize(
"s",
(
"E",
"E1",
"E123",
"ABC",
"ABC1",
"ABC123",
),
)
def test_valid_plugin_prefixes(s):
assert VALID_CODE_PREFIX.match(s)
@pytest.mark.parametrize(
"s",
(
"",
"A1234",
"ABCD",
"abc",
"a-b",
"",
"A𝟗",
),
)
def test_invalid_plugin_prefixes(s):
assert VALID_CODE_PREFIX.match(s) is None

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import os.path
import pytest
@ -123,7 +125,6 @@ def _expand_paths(
stdin_display_name="stdin",
filename_patterns=("*.py",),
exclude=(),
is_running_from_diff=False,
):
return set(
expand_paths(
@ -131,7 +132,6 @@ def _expand_paths(
stdin_display_name=stdin_display_name,
filename_patterns=filename_patterns,
exclude=exclude,
is_running_from_diff=is_running_from_diff,
)
)
@ -164,11 +164,3 @@ def test_alternate_stdin_name_is_filtered():
def test_filename_included_even_if_not_matching_include(tmp_path):
some_file = str(tmp_path.joinpath("some/file"))
assert _expand_paths(paths=(some_file,)) == {some_file}
def test_diff_filenames_filtered_by_patterns(tmp_path):
f1 = str(tmp_path.joinpath("f1"))
f2 = str(tmp_path.joinpath("f2.py"))
ret = _expand_paths(paths=(f1, f2), is_running_from_diff=True)
assert ret == {f2}

View file

@ -1,4 +1,6 @@
"""Tests for the flake8.exceptions module."""
from __future__ import annotations
import pickle
import pytest

View file

@ -1,4 +1,6 @@
"""Unit tests for the FileChecker class."""
from __future__ import annotations
import argparse
from unittest import mock

View file

@ -1,4 +1,6 @@
"""Tests for the FileProcessor class."""
from __future__ import annotations
import ast
import tokenize
from unittest import mock

View file

@ -1,4 +1,6 @@
"""Tests for the FilenameOnly formatter object."""
from __future__ import annotations
import argparse
from flake8.formatting import default

View file

@ -1,55 +1,12 @@
"""Tests for Flake8's legacy API."""
import argparse
import configparser
import os.path
from __future__ import annotations
from unittest import mock
import pytest
from flake8.api import legacy as api
from flake8.formatting import base as formatter
from flake8.options import config
def test_get_style_guide():
"""Verify the methods called on our internal Application."""
prelim_opts = argparse.Namespace(
append_config=[],
config=None,
isolated=False,
output_file=None,
verbose=0,
enable_extensions=None,
require_plugins=None,
)
mockedapp = mock.Mock()
mockedapp.parse_preliminary_options.return_value = (prelim_opts, [])
mockedapp.program = "flake8"
cfg = configparser.RawConfigParser()
cfg_dir = os.getcwd()
with mock.patch.object(config, "load_config", return_value=(cfg, cfg_dir)):
with mock.patch("flake8.main.application.Application") as application:
application.return_value = mockedapp
style_guide = api.get_style_guide()
application.assert_called_once_with()
mockedapp.parse_preliminary_options.assert_called_once_with([])
mockedapp.find_plugins.assert_called_once_with(
cfg,
cfg_dir,
enable_extensions=None,
require_plugins=None,
)
mockedapp.register_plugin_options.assert_called_once_with()
mockedapp.parse_configuration_and_cli.assert_called_once_with(
cfg, cfg_dir, []
)
mockedapp.make_formatter.assert_called_once_with()
mockedapp.make_guide.assert_called_once_with()
mockedapp.make_file_checker_manager.assert_called_once_with()
assert isinstance(style_guide, api.StyleGuide)
def test_styleguide_options():

View file

@ -1,3 +1,5 @@
from __future__ import annotations
from flake8.main import options

View file

@ -1,4 +1,6 @@
"""Tests for the Nothing formatter obbject."""
from __future__ import annotations
import argparse
from flake8.formatting import default

View file

@ -1,4 +1,6 @@
"""Unit tests for flake8.options.manager.Option."""
from __future__ import annotations
import functools
from unittest import mock

View file

@ -1,7 +1,8 @@
"""Unit tests for flake.options.manager.OptionManager."""
from __future__ import annotations
import argparse
import os
from unittest import mock
import pytest
@ -15,7 +16,10 @@ TEST_VERSION = "3.0.0b1"
def optmanager():
"""Generate a simple OptionManager with default test arguments."""
return manager.OptionManager(
version=TEST_VERSION, plugin_versions="", parents=[]
version=TEST_VERSION,
plugin_versions="",
parents=[],
formatter_names=[],
)
@ -32,7 +36,10 @@ def test_option_manager_including_parent_options():
# WHEN
optmanager = manager.OptionManager(
version=TEST_VERSION, plugin_versions="", parents=[parent_parser]
version=TEST_VERSION,
plugin_versions="",
parents=[parent_parser],
formatter_names=[],
)
options = optmanager.parse_args(["--parent", "foo"])
@ -162,96 +169,6 @@ def test_extend_default_ignore(optmanager):
assert optmanager.extended_default_ignore == ["T100", "T101", "T102"]
def test_optparse_normalize_callback_option_legacy(optmanager):
"""Test the optparse shim for `callback=`."""
callback_foo = mock.Mock()
optmanager.add_option(
"--foo",
action="callback",
callback=callback_foo,
callback_args=(1, 2),
callback_kwargs={"a": "b"},
)
callback_bar = mock.Mock()
optmanager.add_option(
"--bar",
action="callback",
type="string",
callback=callback_bar,
)
callback_baz = mock.Mock()
optmanager.add_option(
"--baz",
action="callback",
type="string",
nargs=2,
callback=callback_baz,
)
optmanager.parse_args(["--foo", "--bar", "bararg", "--baz", "1", "2"])
callback_foo.assert_called_once_with(
mock.ANY, # the option / action instance
"--foo",
None,
mock.ANY, # the OptionParser / ArgumentParser
1,
2,
a="b",
)
callback_bar.assert_called_once_with(
mock.ANY, # the option / action instance
"--bar",
"bararg",
mock.ANY, # the OptionParser / ArgumentParser
)
callback_baz.assert_called_once_with(
mock.ANY, # the option / action instance
"--baz",
("1", "2"),
mock.ANY, # the OptionParser / ArgumentParser
)
@pytest.mark.parametrize(
("type_s", "input_val", "expected"),
(
("int", "5", 5),
("long", "6", 6),
("string", "foo", "foo"),
("float", "1.5", 1.5),
("complex", "1+5j", 1 + 5j),
# optparse allows this but does not document it
("str", "foo", "foo"),
),
)
def test_optparse_normalize_types(optmanager, type_s, input_val, expected):
"""Test the optparse shim for type="typename"."""
optmanager.add_option("--foo", type=type_s)
opts = optmanager.parse_args(["--foo", input_val])
assert opts.foo == expected
def test_optparse_normalize_choice_type(optmanager):
"""Test the optparse shim for type="choice"."""
optmanager.add_option("--foo", type="choice", choices=("1", "2", "3"))
opts = optmanager.parse_args(["--foo", "1"])
assert opts.foo == "1"
# fails to parse
with pytest.raises(SystemExit):
optmanager.parse_args(["--foo", "4"])
def test_optparse_normalize_help(optmanager, capsys):
"""Test the optparse shim for %default in help text."""
optmanager.add_option("--foo", default="bar", help="default: %default")
with pytest.raises(SystemExit):
optmanager.parse_args(["--help"])
out, err = capsys.readouterr()
output = out + err
assert "default: bar" in output
@pytest.mark.parametrize(
("s", "is_auto", "n_jobs"),
(

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import configparser
import os.path
from unittest import mock
@ -166,7 +168,9 @@ def test_load_extra_config_utf8(tmpdir):
@pytest.fixture
def opt_manager():
ret = OptionManager(version="123", plugin_versions="", parents=[])
ret = OptionManager(
version="123", plugin_versions="", parents=[], formatter_names=[]
)
register_default_options(ret)
return ret
@ -216,3 +220,40 @@ def test_parse_config_ignores_unknowns(tmp_path, opt_manager, caplog):
def test_load_config_missing_file_raises_exception(capsys):
with pytest.raises(exceptions.ExecutionError):
config.load_config("foo.cfg", [])
def test_load_config_missing_append_config_raise_exception():
with pytest.raises(exceptions.ExecutionError):
config.load_config(None, ["dont_exist_config.cfg"], isolated=False)
def test_invalid_ignore_codes_raise_error(tmpdir, opt_manager):
tmpdir.join("setup.cfg").write("[flake8]\nignore = E203, //comment")
with tmpdir.as_cwd():
cfg, _ = config.load_config("setup.cfg", [], isolated=False)
with pytest.raises(ValueError) as excinfo:
config.parse_config(opt_manager, cfg, tmpdir)
expected = (
"Error code '//comment' supplied to 'ignore' option "
"does not match '^[A-Z]{1,3}[0-9]{0,3}$'"
)
(msg,) = excinfo.value.args
assert msg == expected
def test_invalid_extend_ignore_codes_raise_error(tmpdir, opt_manager):
tmpdir.join("setup.cfg").write("[flake8]\nextend-ignore = E203, //comment")
with tmpdir.as_cwd():
cfg, _ = config.load_config("setup.cfg", [], isolated=False)
with pytest.raises(ValueError) as excinfo:
config.parse_config(opt_manager, cfg, tmpdir)
expected = (
"Error code '//comment' supplied to 'extend-ignore' option "
"does not match '^[A-Z]{1,3}[0-9]{0,3}$'"
)
(msg,) = excinfo.value.args
assert msg == expected

View file

@ -1,4 +1,6 @@
"""Tests of pyflakes monkey patches."""
from __future__ import annotations
import ast
import pyflakes

View file

@ -1,4 +1,6 @@
"""Tests for the statistics module in Flake8."""
from __future__ import annotations
import pytest
from flake8 import statistics as stats

View file

@ -1,4 +1,6 @@
"""Tests for the flake8.style_guide.StyleGuide class."""
from __future__ import annotations
import argparse
from unittest import mock

View file

@ -1,4 +1,6 @@
"""Tests for flake8's utils module."""
from __future__ import annotations
import io
import logging
import os
@ -181,44 +183,6 @@ def test_fnmatch(filename, patterns, expected):
assert utils.fnmatch(filename, patterns) is expected
def read_diff_file(filename):
"""Read the diff file in its entirety."""
with open(filename) as fd:
content = fd.read()
return content
SINGLE_FILE_DIFF = read_diff_file("tests/fixtures/diffs/single_file_diff")
SINGLE_FILE_INFO = {
"flake8/utils.py": set(range(75, 83)).union(set(range(84, 94))),
}
TWO_FILE_DIFF = read_diff_file("tests/fixtures/diffs/two_file_diff")
TWO_FILE_INFO = {
"flake8/utils.py": set(range(75, 83)).union(set(range(84, 94))),
"tests/unit/test_utils.py": set(range(115, 128)),
}
MULTI_FILE_DIFF = read_diff_file("tests/fixtures/diffs/multi_file_diff")
MULTI_FILE_INFO = {
"flake8/utils.py": set(range(75, 83)).union(set(range(84, 94))),
"tests/unit/test_utils.py": set(range(115, 129)),
"tests/fixtures/diffs/single_file_diff": set(range(1, 28)),
"tests/fixtures/diffs/two_file_diff": set(range(1, 46)),
}
@pytest.mark.parametrize(
"diff, parsed_diff",
[
(SINGLE_FILE_DIFF, SINGLE_FILE_INFO),
(TWO_FILE_DIFF, TWO_FILE_INFO),
(MULTI_FILE_DIFF, MULTI_FILE_INFO),
],
)
def test_parse_unified_diff(diff, parsed_diff):
"""Verify that what we parse from a diff matches expectations."""
assert utils.parse_unified_diff(diff) == parsed_diff
def test_stdin_get_value_crlf():
"""Ensure that stdin is normalized from crlf to lf."""
stdin = io.TextIOWrapper(io.BytesIO(b"1\r\n2\r\n"), "UTF-8")

View file

@ -1,4 +1,6 @@
"""Tests for the flake8.violation.Violation class."""
from __future__ import annotations
from unittest import mock
import pytest
@ -49,22 +51,3 @@ def test_disable_is_inline_ignored():
assert error.is_inline_ignored(True) is False
assert getline.called is False
@pytest.mark.parametrize(
"violation_file,violation_line,diff,expected",
[
("file.py", 10, {}, True),
("file.py", 1, {"file.py": range(1, 2)}, True),
("file.py", 10, {"file.py": range(1, 2)}, False),
("file.py", 1, {"other.py": range(1, 2)}, False),
("file.py", 10, {"other.py": range(1, 2)}, False),
],
)
def test_violation_is_in_diff(violation_file, violation_line, diff, expected):
"""Verify that we find violations within a diff."""
violation = Violation(
"E001", violation_file, violation_line, 1, "warning", "line"
)
assert violation.is_in(diff) is expected

View file

@ -1,6 +1,6 @@
[tox]
minversion=2.3.1
envlist = py36,py37,py38,flake8,linters,docs
envlist = py37,py38,flake8,linters,docs
[testenv]
deps =