Merge branch 'py3_plus' into 'master'

drop legacy python versions

Closes #690

See merge request pycqa/flake8!471
This commit is contained in:
Anthony Sottile 2021-03-31 14:22:37 +00:00
commit e0116d8e77
65 changed files with 629 additions and 731 deletions

View file

@ -1,8 +1,8 @@
# To activate, change the Appveyor settings to use `.appveyor.yml`.
install:
- python -m pip install --upgrade setuptools tox virtualenv
- C:\Python38-x64\python.exe -m pip install --upgrade setuptools tox virtualenv
build: off
test_script:
- python -m tox -e py27,py36,py37,dogfood
- C:\Python38-x64\python.exe -m tox -e py36,py37,dogfood

View file

@ -30,3 +30,4 @@ exclude_lines =
# Don't complain if non-runnable code isn't run:
^if __name__ == ['"]__main__['"]:$
^\s*if False:
^\s*if TYPE_CHECKING:

View file

@ -13,26 +13,11 @@ after_script:
- pip install codecov
- codecov --token=7d117e6b-aab6-4283-ab19-166dafc38cf5
pypy2:
image: pypy:2.7-7.2.0
stage: test
script: tox -e pypy
pypy3:
image: pypy:3.6-7.2.0
stage: test
script: tox -e pypy3
python2:
image: python:2.7
stage: test
script: tox -e py27
python35:
image: python:3.5
stage: test
script: tox -e py35
python36:
image: python:3.6
stage: test

View file

@ -1,3 +1,4 @@
exclude: ^tests/fixtures/example-code/
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
@ -7,12 +8,22 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
exclude: ^tests/fixtures/diffs/
- repo: https://github.com/asottile/reorder_python_imports
rev: v2.4.0
hooks:
- id: reorder-python-imports
args: [--application-directories, '.:src', --py36-plus]
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
args: [--line-length=78]
files: ^src/
- repo: https://github.com/asottile/pyupgrade
rev: v2.11.0
hooks:
- id: pyupgrade
args: [--py36-plus]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.720
hooks:

View file

@ -11,18 +11,12 @@ notifications:
matrix:
include:
- python: 2.7
env: TOXENV=py27
- python: 3.5
env: TOXENV=py35
- python: 3.6
env: TOXENV=py36
- python: 3.7
env: TOXENV=py37
- python: 3.8
env: TOXENV=py38
- python: pypy
env: TOXENV=pypy
- python: 3.7
env: TOXENV=readme
- python: 3.7

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# flake8 documentation build configuration file, created by
# sphinx-quickstart on Tue Jan 19 07:14:10 2016.
@ -11,9 +10,8 @@
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys
import os
import sys
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
@ -53,11 +51,12 @@ source_suffix = '.rst'
master_doc = 'index'
# General information about the project.
project = u'flake8'
copyright = u'2016, Ian Stapleton Cordasco'
author = u'Ian Stapleton Cordasco'
project = 'flake8'
copyright = '2016, Ian Stapleton Cordasco'
author = 'Ian Stapleton Cordasco'
import flake8
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
@ -234,8 +233,8 @@ latex_elements = {
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'flake8.tex', u'flake8 Documentation',
u'Ian Stapleton Cordasco', 'manual'),
(master_doc, 'flake8.tex', 'flake8 Documentation',
'Ian Stapleton Cordasco', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
@ -264,7 +263,7 @@ latex_documents = [
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('manpage', 'flake8', u'Flake8 Command Line Documentation',
('manpage', 'flake8', 'Flake8 Command Line Documentation',
[author], 1)
]
@ -278,7 +277,7 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'Flake8', u'Flake8 Documentation', u'Tarek Ziade',
('index', 'Flake8', 'Flake8 Documentation', 'Tarek Ziade',
'Flake8', 'Code checking using pycodestyle, pyflakes and mccabe',
'Miscellaneous'),
]

View file

@ -81,13 +81,11 @@ for users.
Before releasing, the following tox test environments must pass:
- Python 2.7 (a.k.a., ``tox -e py27``)
- Python 3.6 (a.k.a., ``tox -e py36``)
- Python 3.7 (a.k.a., ``tox -e py37``)
- PyPy (a.k.a., ``tox -e pypy``)
- PyPy 3 (a.k.a., ``tox -e pypy3``)
- Linters (a.k.a., ``tox -e linters``)

View file

@ -175,11 +175,7 @@ across multiple lines, insert a new-line after the opening parenthesis, e.g.,
statistic = next(stats_for_error_code)
count = statistic.count
count += sum(stat.count for stat in stats_for_error_code)
self._write('{count:<5} {error_code} {message}'.format(
count=count,
error_code=error_code,
message=statistic.message,
))
self._write(f'{count:<5} {error_code} {statistic.message}')
In the first example, we put a few of the parameters all on one line, and then
added the last two on their own. In the second example, each parameter has its

View file

@ -56,7 +56,6 @@ like:
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 3",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Software Development :: Quality Assurance",

View file

@ -1,5 +1,4 @@
sphinx>=1.3.0,!=3.1.0
sphinx_rtd_theme
sphinx-prompt
configparser
flake8-polyfill

View file

@ -100,8 +100,6 @@ generates its own :term:`error code`\ s for ``pyflakes``:
+------+---------------------------------------------------------------------+
| F811 | redefinition of unused ``name`` from line ``N`` |
+------+---------------------------------------------------------------------+
| F812 | list comprehension redefines ``name`` from line ``N`` |
+------+---------------------------------------------------------------------+
| F821 | undefined name ``name`` |
+------+---------------------------------------------------------------------+
| F822 | undefined name ``name`` in ``__all__`` |
@ -116,9 +114,6 @@ generates its own :term:`error code`\ s for ``pyflakes``:
| F901 | ``raise NotImplemented`` should be ``raise NotImplementedError`` |
+------+---------------------------------------------------------------------+
Note that some of these entries behave differently on Python 2 and Python 3,
for example F812 is specific to Python 2 only.
We also report one extra error: ``E999``. We report ``E999`` when we fail to
compile a file into an Abstract Syntax Tree for the plugins that require it.

View file

@ -14,25 +14,25 @@ like so:
Where you simply allow the shell running in your terminal to locate |Flake8|.
In some cases, though, you may have installed |Flake8| for multiple versions
of Python (e.g., Python 2.7 and Python 3.5) and you need to call a specific
of Python (e.g., Python 3.8 and Python 3.9) and you need to call a specific
version. In that case, you will have much better results using:
.. prompt:: bash
python2.7 -m flake8
python3.8 -m flake8
Or
.. prompt:: bash
python3.5 -m flake8
python3.9 -m flake8
Since that will tell the correct version of Python to run |Flake8|.
.. note::
Installing |Flake8| once will not install it on both Python 2.7 and
Python 3.5. It will only install it for the version of Python that
Installing |Flake8| once will not install it on both Python 3.8 and
Python 3.9. It will only install it for the version of Python that
is running pip.
It is also possible to specify command-line options directly to |Flake8|:

View file

@ -24,10 +24,8 @@ appropriate of:
pip install <plugin-name>
pip3 install <plugin-name>
python -m pip install <plugin-name>
python2.7 -m pip install <plugin-name>
python3 -m pip install <plugin-name>
python3.4 -m pip install <plugin-name>
python3.5 -m pip install <plugin-name>
python3.9 -m pip install <plugin-name>
To install the plugin, where ``<plugin-name>`` is the package name on PyPI_.
To verify installation use:

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
import setuptools
setuptools.setup(
@ -21,11 +20,9 @@ setuptools.setup(
'Framework :: Flake8',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: Software Development :: Quality Assurance',
],

View file

@ -1,7 +1,6 @@
"""Module for an example Flake8 plugin."""
from .on_by_default import ExampleOne
from .off_by_default import ExampleTwo
from .on_by_default import ExampleOne
__all__ = (
'ExampleOne',

View file

@ -1,7 +1,7 @@
"""Our first example plugin."""
class ExampleTwo(object):
class ExampleTwo:
"""Second Example Plugin."""
name = 'off-by-default-example-plugin'
version = '1.0.0'

View file

@ -1,7 +1,7 @@
"""Our first example plugin."""
class ExampleOne(object):
class ExampleOne:
"""First Example Plugin."""
name = 'on-by-default-example-plugin'
version = '1.0.0'
@ -11,5 +11,4 @@ class ExampleOne(object):
def run(self):
"""Do nothing."""
for message in []:
yield message
yield from []

View file

@ -1,10 +1,4 @@
[pytest]
norecursedirs = .git .* *.egg* old docs dist build
addopts = -rw
filterwarnings =
error
# python3.4 raises this when importing setuptools
ignore:The value of convert_charrefs will become True in 3.5.*:DeprecationWarning
# python3 raises this when importing setuptools
ignore:the imp module is deprecated in favour of importlib.*:PendingDeprecationWarning
ignore:the imp module is deprecated in favour of importlib.*:DeprecationWarning
filterwarnings = error

View file

@ -20,10 +20,7 @@ classifiers =
Intended Audience :: Developers
License :: OSI Approved :: MIT License
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
@ -44,13 +41,9 @@ install_requires=
pyflakes >= 2.3.0, < 2.4.0
pycodestyle >= 2.7.0, < 2.8.0
mccabe >= 0.6.0, < 0.7.0
enum34; python_version<"3.4"
typing; python_version<"3.5"
configparser; python_version<"3.2"
functools32; python_version<"3.2"
importlib-metadata; python_version<"3.8"
python_requires = >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*
python_requires = >=3.6
[options.packages.find]
where = src

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Packaging logic for Flake8."""
import os
import sys

View file

@ -11,9 +11,7 @@ This module
"""
import logging
import sys
if False: # `typing.TYPE_CHECKING` was introduced in 3.5.2
from typing import Type # `typing.Type` was introduced in 3.5.2
from typing import Type
LOG = logging.getLogger(__name__)
LOG.addHandler(logging.NullHandler())
@ -64,7 +62,7 @@ def configure_logging(verbosity, filename=None, logformat=LOG_FORMAT):
if not filename or filename in ("stderr", "stdout"):
fileobj = getattr(sys, filename or "stderr")
handler_cls = logging.StreamHandler # type: Type[logging.Handler]
handler_cls: Type[logging.Handler] = logging.StreamHandler
else:
fileobj = filename
handler_cls = logging.FileHandler

View file

@ -1,14 +1,9 @@
"""Expose backports in a single place."""
import sys
if sys.version_info >= (3,): # pragma: no cover (PY3+)
from functools import lru_cache
else: # pragma: no cover (<PY3)
from functools32 import lru_cache
if sys.version_info >= (3, 8): # pragma: no cover (PY38+)
import importlib.metadata as importlib_metadata
else: # pragma: no cover (<PY38)
import importlib_metadata
__all__ = ("lru_cache", "importlib_metadata")
__all__ = ("importlib_metadata",)

View file

@ -60,7 +60,7 @@ def get_style_guide(**kwargs):
return StyleGuide(application)
class StyleGuide(object):
class StyleGuide:
"""Public facing object that mimic's Flake8 2.0's StyleGuide.
.. note::
@ -81,7 +81,7 @@ class StyleGuide(object):
self._file_checker_manager = application.file_checker_manager
@property
def options(self): # type: () -> argparse.Namespace
def options(self) -> argparse.Namespace:
"""Return application's options.
An instance of :class:`argparse.Namespace` containing parsed options.
@ -170,7 +170,7 @@ class StyleGuide(object):
return self.check_files([filename])
class Report(object):
class Report:
"""Public facing object that mimic's Flake8 2.0's API.
.. note::
@ -210,6 +210,6 @@ class Report(object):
list
"""
return [
"{} {} {}".format(s.count, s.error_code, s.message)
f"{s.count} {s.error_code} {s.message}"
for s in self._stats.statistics_for(violation)
]

View file

@ -4,20 +4,22 @@ import errno
import itertools
import logging
import signal
import sys
import tokenize
from typing import Dict, List, Optional, Tuple
try:
import multiprocessing.pool
except ImportError:
multiprocessing = None # type: ignore
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from flake8 import defaults
from flake8 import exceptions
from flake8 import processor
from flake8 import utils
try:
import multiprocessing.pool
except ImportError:
multiprocessing = None # type: ignore
LOG = logging.getLogger(__name__)
SERIAL_RETRY_ERRNOS = {
@ -38,17 +40,14 @@ SERIAL_RETRY_ERRNOS = {
def _multiprocessing_is_fork(): # type () -> bool
"""Class state is only preserved when using the `fork` strategy."""
if sys.version_info >= (3, 4):
return (
multiprocessing
# https://github.com/python/typeshed/pull/3415
and multiprocessing.get_start_method() == "fork" # type: ignore
)
else:
return multiprocessing and not utils.is_windows()
return (
multiprocessing
# https://github.com/python/typeshed/pull/3415
and multiprocessing.get_start_method() == "fork" # type: ignore
)
class Manager(object):
class Manager:
"""Manage the parallelism and checker instances for each plugin and file.
This class will be responsible for the following:
@ -86,8 +85,8 @@ class Manager(object):
self.options = style_guide.options
self.checks = checker_plugins
self.jobs = self._job_count()
self._all_checkers = [] # type: List[FileChecker]
self.checkers = [] # type: List[FileChecker]
self._all_checkers: List[FileChecker] = []
self.checkers: List[FileChecker] = []
self.statistics = {
"files": 0,
"logical lines": 0,
@ -104,8 +103,7 @@ class Manager(object):
self.statistics[statistic] += checker.statistics[statistic]
self.statistics["files"] += len(self.checkers)
def _job_count(self):
# type: () -> int
def _job_count(self) -> int:
# First we walk through all of our error cases:
# - multiprocessing library is not present
# - we're running on windows in which case we know we have significant
@ -166,8 +164,7 @@ class Manager(object):
)
return reported_results_count
def is_path_excluded(self, path):
# type: (str) -> bool
def is_path_excluded(self, path: str) -> bool:
"""Check if a path is excluded.
:param str path:
@ -190,8 +187,7 @@ class Manager(object):
logger=LOG,
)
def make_checkers(self, paths=None):
# type: (Optional[List[str]]) -> None
def make_checkers(self, paths: Optional[List[str]] = None) -> None:
"""Create checkers for each file."""
if paths is None:
paths = self.arguments
@ -236,8 +232,7 @@ class Manager(object):
self.checkers = [c for c in self._all_checkers if c.should_process]
LOG.info("Checking %d files", len(self.checkers))
def report(self):
# type: () -> Tuple[int, int]
def report(self) -> Tuple[int, int]:
"""Report all of the errors found in the managed file checkers.
This iterates over each of the checkers and reports the errors sorted
@ -259,11 +254,11 @@ class Manager(object):
results_found += len(results)
return (results_found, results_reported)
def run_parallel(self): # type: () -> None
def run_parallel(self) -> None:
"""Run the checkers in parallel."""
# fmt: off
final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501
final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501
final_results: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] = collections.defaultdict(list) # noqa: E501
final_statistics: Dict[str, Dict[str, int]] = collections.defaultdict(dict) # noqa: E501
# fmt: on
pool = _try_initialize_processpool(self.jobs)
@ -298,12 +293,12 @@ class Manager(object):
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]
def run_serial(self): # type: () -> None
def run_serial(self) -> None:
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
def run(self): # type: () -> None
def run(self) -> None:
"""Run all the checkers.
This will intelligently decide whether to run the checks in parallel
@ -337,7 +332,7 @@ class Manager(object):
self._process_statistics()
class FileChecker(object):
class FileChecker:
"""Manage running checks for a file and aggregate the results."""
def __init__(self, filename, checks, options):
@ -357,9 +352,7 @@ class FileChecker(object):
self.options = options
self.filename = filename
self.checks = checks
# fmt: off
self.results = [] # type: List[Tuple[str, int, int, str, Optional[str]]] # noqa: E501
# fmt: on
self.results: List[Tuple[str, int, int, str, Optional[str]]] = []
self.statistics = {
"tokens": 0,
"logical lines": 0,
@ -373,27 +366,30 @@ class FileChecker(object):
self.should_process = not self.processor.should_ignore_file()
self.statistics["physical lines"] = len(self.processor.lines)
def __repr__(self): # type: () -> str
def __repr__(self) -> str:
"""Provide helpful debugging representation."""
return "FileChecker for {}".format(self.filename)
return f"FileChecker for {self.filename}"
def _make_processor(self):
# type: () -> Optional[processor.FileProcessor]
def _make_processor(self) -> Optional[processor.FileProcessor]:
try:
return processor.FileProcessor(self.filename, self.options)
except IOError as e:
except OSError as e:
# If we can not read the file due to an IOError (e.g., the file
# does not exist or we do not have the permissions to open it)
# then we need to format that exception for the user.
# NOTE(sigmavirus24): Historically, pep8 has always reported this
# as an E902. We probably *want* a better error code for this
# going forward.
message = "{0}: {1}".format(type(e).__name__, e)
self.report("E902", 0, 0, message)
self.report("E902", 0, 0, f"{type(e).__name__}: {e}")
return None
def report(self, error_code, line_number, column, text):
# type: (Optional[str], int, int, str) -> str
def report(
self,
error_code: Optional[str],
line_number: int,
column: int,
text: str,
) -> str:
"""Report an error by storing it in the results list."""
if error_code is None:
error_code, text = text.split(" ", 1)
@ -469,14 +465,14 @@ class FileChecker(object):
column -= column_offset
return row, column
def run_ast_checks(self): # type: () -> None
def run_ast_checks(self) -> None:
"""Run all checks expecting an abstract syntax tree."""
try:
ast = self.processor.build_ast()
except (ValueError, SyntaxError, TypeError) as e:
row, column = self._extract_syntax_information(e)
self.report(
"E999", row, column, "%s: %s" % (type(e).__name__, e.args[0])
"E999", row, column, f"{type(e).__name__}: {e.args[0]}"
)
return
@ -608,8 +604,9 @@ class FileChecker(object):
else:
self.run_logical_checks()
def check_physical_eol(self, token, prev_physical):
# type: (processor._Token, str) -> None
def check_physical_eol(
self, token: processor._Token, prev_physical: str
) -> None:
"""Run physical checks if and only if it is at the end of the line."""
# a newline token ends a single physical line.
if processor.is_eol_token(token):
@ -638,13 +635,14 @@ class FileChecker(object):
self.run_physical_checks(line + "\n")
def _pool_init(): # type: () -> None
def _pool_init() -> None:
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _try_initialize_processpool(job_count):
# type: (int) -> Optional[multiprocessing.pool.Pool]
def _try_initialize_processpool(
job_count: int,
) -> Optional[multiprocessing.pool.Pool]:
"""Return a new process pool instance if we are able to create one."""
try:
return multiprocessing.Pool(job_count, _pool_init)
@ -673,8 +671,9 @@ def _run_checks(checker):
return checker.run_checks()
def find_offset(offset, mapping):
# type: (int, processor._LogicalMapping) -> Tuple[int, int]
def find_offset(
offset: int, mapping: processor._LogicalMapping
) -> Tuple[int, int]:
"""Find the offset tuple for a single offset."""
if isinstance(offset, tuple):
return offset

View file

@ -19,14 +19,13 @@ class FailedToLoadPlugin(Flake8Exception):
FORMAT = 'Flake8 failed to load plugin "%(name)s" due to %(exc)s.'
def __init__(self, plugin_name, exception):
# type: (str, Exception) -> None
def __init__(self, plugin_name: str, exception: Exception) -> None:
"""Initialize our FailedToLoadPlugin exception."""
self.plugin_name = plugin_name
self.original_exception = exception
super(FailedToLoadPlugin, self).__init__(plugin_name, exception)
super().__init__(plugin_name, exception)
def __str__(self): # type: () -> str
def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT % {
"name": self.plugin_name,
@ -37,18 +36,18 @@ class FailedToLoadPlugin(Flake8Exception):
class InvalidSyntax(Flake8Exception):
"""Exception raised when tokenizing a file fails."""
def __init__(self, exception): # type: (Exception) -> None
def __init__(self, exception: Exception) -> None:
"""Initialize our InvalidSyntax exception."""
self.original_exception = exception
self.error_message = "{0}: {1}".format(
exception.__class__.__name__, exception.args[0]
self.error_message = (
f"{type(exception).__name__}: {exception.args[0]}"
)
self.error_code = "E902"
self.line_number = 1
self.column_number = 0
super(InvalidSyntax, self).__init__(exception)
super().__init__(exception)
def __str__(self): # type: () -> str
def __str__(self) -> str:
"""Format our exception message."""
return self.error_message
@ -58,16 +57,13 @@ class PluginRequestedUnknownParameters(Flake8Exception):
FORMAT = '"%(name)s" requested unknown parameters causing %(exc)s'
def __init__(self, plugin, exception):
# type: (Dict[str, str], Exception) -> None
def __init__(self, plugin: Dict[str, str], exception: Exception) -> None:
"""Pop certain keyword arguments for initialization."""
self.plugin = plugin
self.original_exception = exception
super(PluginRequestedUnknownParameters, self).__init__(
plugin, exception
)
super().__init__(plugin, exception)
def __str__(self): # type: () -> str
def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT % {
"name": self.plugin["plugin_name"],
@ -80,14 +76,13 @@ class PluginExecutionFailed(Flake8Exception):
FORMAT = '"%(name)s" failed during execution due to "%(exc)s"'
def __init__(self, plugin, exception):
# type: (Dict[str, str], Exception) -> None
def __init__(self, plugin: Dict[str, str], exception: Exception) -> None:
"""Utilize keyword arguments for message generation."""
self.plugin = plugin
self.original_exception = exception
super(PluginExecutionFailed, self).__init__(plugin, exception)
super().__init__(plugin, exception)
def __str__(self): # type: () -> str
def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT % {
"name": self.plugin["plugin_name"],

View file

@ -1,15 +1,17 @@
"""The base class and interface for all formatting plugins."""
from __future__ import print_function
import argparse
from typing import IO, List, Optional, Tuple
from typing import IO
from typing import List
from typing import Optional
from typing import Tuple
from typing import TYPE_CHECKING
if False: # `typing.TYPE_CHECKING` was introduced in 3.5.2
if TYPE_CHECKING:
from flake8.statistics import Statistics
from flake8.style_guide import Violation
class BaseFormatter(object):
class BaseFormatter:
"""Class defining the formatter interface.
.. attribute:: options
@ -31,8 +33,7 @@ class BaseFormatter(object):
output filename has been specified.
"""
def __init__(self, options):
# type: (argparse.Namespace) -> None
def __init__(self, options: argparse.Namespace) -> None:
"""Initialize with the options parsed from config and cli.
This also calls a hook, :meth:`after_init`, so subclasses do not need
@ -46,14 +47,14 @@ class BaseFormatter(object):
"""
self.options = options
self.filename = options.output_file
self.output_fd = None # type: Optional[IO[str]]
self.output_fd: Optional[IO[str]] = None
self.newline = "\n"
self.after_init()
def after_init(self): # type: () -> None
def after_init(self) -> None:
"""Initialize the formatter further."""
def beginning(self, filename): # type: (str) -> None
def beginning(self, filename: str) -> None:
"""Notify the formatter that we're starting to process a file.
:param str filename:
@ -61,7 +62,7 @@ class BaseFormatter(object):
from.
"""
def finished(self, filename): # type: (str) -> None
def finished(self, filename: str) -> None:
"""Notify the formatter that we've finished processing a file.
:param str filename:
@ -69,7 +70,7 @@ class BaseFormatter(object):
from.
"""
def start(self): # type: () -> None
def start(self) -> None:
"""Prepare the formatter to receive input.
This defaults to initializing :attr:`output_fd` if :attr:`filename`
@ -77,7 +78,7 @@ class BaseFormatter(object):
if self.filename:
self.output_fd = open(self.filename, "a")
def handle(self, error): # type: (Violation) -> None
def handle(self, error: "Violation") -> None:
"""Handle an error reported by Flake8.
This defaults to calling :meth:`format`, :meth:`show_source`, and
@ -94,7 +95,7 @@ class BaseFormatter(object):
source = self.show_source(error)
self.write(line, source)
def format(self, error): # type: (Violation) -> Optional[str]
def format(self, error: "Violation") -> Optional[str]:
"""Format an error reported by Flake8.
This method **must** be implemented by subclasses.
@ -113,23 +114,16 @@ class BaseFormatter(object):
"Subclass of BaseFormatter did not implement" " format."
)
def show_statistics(self, statistics): # type: (Statistics) -> None
def show_statistics(self, statistics: "Statistics") -> None:
"""Format and print the statistics."""
for error_code in statistics.error_codes():
stats_for_error_code = statistics.statistics_for(error_code)
statistic = next(stats_for_error_code)
count = statistic.count
count += sum(stat.count for stat in stats_for_error_code)
self._write(
"{count:<5} {error_code} {message}".format(
count=count,
error_code=error_code,
message=statistic.message,
)
)
self._write(f"{count:<5} {error_code} {statistic.message}")
def show_benchmarks(self, benchmarks):
# type: (List[Tuple[str, float]]) -> None
def show_benchmarks(self, benchmarks: List[Tuple[str, float]]) -> None:
"""Format and print the benchmarks."""
# NOTE(sigmavirus24): The format strings are a little confusing, even
# to me, so here's a quick explanation:
@ -150,7 +144,7 @@ class BaseFormatter(object):
benchmark = float_format(statistic=statistic, value=value)
self._write(benchmark)
def show_source(self, error): # type: (Violation) -> Optional[str]
def show_source(self, error: "Violation") -> Optional[str]:
"""Show the physical line generating the error.
This also adds an indicator for the particular part of the line that
@ -179,17 +173,16 @@ class BaseFormatter(object):
)
# Physical lines have a newline at the end, no need to add an extra
# one
return "{}{}^".format(error.physical_line, indent)
return f"{error.physical_line}{indent}^"
def _write(self, output): # type: (str) -> None
def _write(self, output: str) -> None:
"""Handle logic of whether to use an output file or print()."""
if self.output_fd is not None:
self.output_fd.write(output + self.newline)
if self.output_fd is None or self.options.tee:
print(output, end=self.newline)
def write(self, line, source):
# type: (Optional[str], Optional[str]) -> None
def write(self, line: Optional[str], source: Optional[str]) -> None:
"""Write the line either to the output file or stdout.
This handles deciding whether to write to a file or print to standard
@ -207,7 +200,7 @@ class BaseFormatter(object):
if source:
self._write(source)
def stop(self): # type: () -> None
def stop(self) -> None:
"""Clean up after reporting is finished."""
if self.output_fd is not None:
self.output_fd.close()

View file

@ -1,9 +1,11 @@
"""Default formatting class for Flake8."""
from typing import Optional, Set
from typing import Optional
from typing import Set
from typing import TYPE_CHECKING
from flake8.formatting import base
if False: # `typing.TYPE_CHECKING` was introduced in 3.5.2
if TYPE_CHECKING:
from flake8.style_guide import Violation
@ -23,9 +25,9 @@ class SimpleFormatter(base.BaseFormatter):
"""
error_format = None # type: str
error_format: str
def format(self, error): # type: (Violation) -> Optional[str]
def format(self, error: "Violation") -> Optional[str]:
"""Format and write error out.
If an output filename is specified, write formatted errors to that
@ -49,7 +51,7 @@ class Default(SimpleFormatter):
error_format = "%(path)s:%(row)d:%(col)d: %(code)s %(text)s"
def after_init(self): # type: () -> None
def after_init(self) -> None:
"""Check for a custom format string."""
if self.options.format.lower() != "default":
self.error_format = self.options.format
@ -66,18 +68,18 @@ class FilenameOnly(SimpleFormatter):
error_format = "%(path)s"
def after_init(self): # type: () -> None
def after_init(self) -> None:
"""Initialize our set of filenames."""
self.filenames_already_printed = set() # type: Set[str]
self.filenames_already_printed: Set[str] = set()
def show_source(self, error): # type: (Violation) -> Optional[str]
def show_source(self, error: "Violation") -> Optional[str]:
"""Do not include the source code."""
def format(self, error): # type: (Violation) -> Optional[str]
def format(self, error: "Violation") -> Optional[str]:
"""Ensure we only print each error once."""
if error.filename not in self.filenames_already_printed:
self.filenames_already_printed.add(error.filename)
return super(FilenameOnly, self).format(error)
return super().format(error)
else:
return None
@ -85,8 +87,8 @@ class FilenameOnly(SimpleFormatter):
class Nothing(base.BaseFormatter):
"""Print absolutely nothing."""
def format(self, error): # type: (Violation) -> Optional[str]
def format(self, error: "Violation") -> Optional[str]:
"""Do nothing."""
def show_source(self, error): # type: (Violation) -> Optional[str]
def show_source(self, error: "Violation") -> Optional[str]:
"""Do not print the source."""

View file

@ -1,11 +1,15 @@
"""Module containing the application logic for Flake8."""
from __future__ import print_function
import argparse
import logging
import sys
import time
from typing import Dict, List, Optional, Set, Tuple
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
import flake8
from flake8 import checker
@ -14,19 +18,19 @@ from flake8 import exceptions
from flake8 import style_guide
from flake8 import utils
from flake8.main import options
from flake8.options import aggregator, config
from flake8.options import aggregator
from flake8.options import config
from flake8.options import manager
from flake8.plugins import manager as plugin_manager
if False: # `typing.TYPE_CHECKING` was introduced in 3.5.2
from typing import Type # `typing.Type` was introduced in 3.5.2
if TYPE_CHECKING:
from flake8.formatting.base import BaseFormatter
LOG = logging.getLogger(__name__)
class Application(object):
class Application:
"""Abstract our application into a class."""
def __init__(self, program="flake8", version=flake8.__version__):
@ -40,7 +44,7 @@ class Application(object):
#: The timestamp when the Application instance was instantiated.
self.start_time = time.time()
#: The timestamp when the Application finished reported errors.
self.end_time = None # type: float
self.end_time: float = None
#: The name of the program being run
self.program = program
#: The version of the program being run
@ -59,26 +63,24 @@ class Application(object):
options.register_default_options(self.option_manager)
#: The instance of :class:`flake8.plugins.manager.Checkers`
self.check_plugins = None # type: plugin_manager.Checkers
# fmt: off
self.check_plugins: plugin_manager.Checkers = None
#: The instance of :class:`flake8.plugins.manager.ReportFormatters`
self.formatting_plugins = None # type: plugin_manager.ReportFormatters
# fmt: on
self.formatting_plugins: plugin_manager.ReportFormatters = None
#: The user-selected formatter from :attr:`formatting_plugins`
self.formatter = None # type: BaseFormatter
self.formatter: BaseFormatter = None
#: The :class:`flake8.style_guide.StyleGuideManager` built from the
#: user's options
self.guide = None # type: style_guide.StyleGuideManager
self.guide: style_guide.StyleGuideManager = None
#: The :class:`flake8.checker.Manager` that will handle running all of
#: the checks selected by the user.
self.file_checker_manager = None # type: checker.Manager
self.file_checker_manager: checker.Manager = None
#: The user-supplied options parsed into an instance of
#: :class:`argparse.Namespace`
self.options = None # type: argparse.Namespace
self.options: argparse.Namespace = None
#: The left over arguments that were not parsed by
#: :attr:`option_manager`
self.args = None # type: List[str]
self.args: List[str] = None
#: The number of errors, warnings, and other messages after running
#: flake8 and taking into account ignored errors and lines.
self.result_count = 0
@ -92,10 +94,11 @@ class Application(object):
#: Whether the program is processing a diff or not
self.running_against_diff = False
#: The parsed diff information
self.parsed_diff = {} # type: Dict[str, Set[int]]
self.parsed_diff: Dict[str, Set[int]] = {}
def parse_preliminary_options(self, argv):
# type: (List[str]) -> Tuple[argparse.Namespace, List[str]]
def parse_preliminary_options(
self, argv: List[str]
) -> Tuple[argparse.Namespace, List[str]]:
"""Get preliminary options from the CLI, pre-plugin-loading.
We need to know the values of a few standard options so that we can
@ -119,8 +122,7 @@ class Application(object):
rest.extend(("--output-file", args.output_file))
return args, rest
def exit(self):
# type: () -> None
def exit(self) -> None:
"""Handle finalization and exiting the program.
This should be the last thing called on the application instance. It
@ -136,8 +138,7 @@ class Application(object):
(self.result_count > 0) or self.catastrophic_failure
)
def find_plugins(self, config_finder):
# type: (config.ConfigFileFinder) -> None
def find_plugins(self, config_finder: config.ConfigFileFinder) -> None:
"""Find and load the plugins for this application.
Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes
@ -159,8 +160,7 @@ class Application(object):
self.check_plugins.load_plugins()
self.formatting_plugins.load_plugins()
def register_plugin_options(self):
# type: () -> None
def register_plugin_options(self) -> None:
"""Register options provided by plugins to our option manager."""
self.check_plugins.register_options(self.option_manager)
self.check_plugins.register_plugin_versions(self.option_manager)
@ -168,10 +168,9 @@ class Application(object):
def parse_configuration_and_cli(
self,
config_finder, # type: config.ConfigFileFinder
argv, # type: List[str]
):
# type: (...) -> None
config_finder: config.ConfigFileFinder,
argv: List[str],
) -> None:
"""Parse configuration files and the CLI options.
:param config.ConfigFileFinder config_finder:
@ -211,8 +210,9 @@ class Application(object):
return formatter_plugin.execute
def make_formatter(self, formatter_class=None):
# type: (Optional[Type[BaseFormatter]]) -> None
def make_formatter(
self, formatter_class: Optional[Type["BaseFormatter"]] = None
) -> None:
"""Initialize a formatter based on the parsed options."""
format_plugin = self.options.format
if 1 <= self.options.quiet < 2:
@ -225,8 +225,7 @@ class Application(object):
self.formatter = formatter_class(self.options)
def make_guide(self):
# type: () -> None
def make_guide(self) -> None:
"""Initialize our StyleGuide."""
self.guide = style_guide.StyleGuideManager(
self.options, self.formatter
@ -235,8 +234,7 @@ class Application(object):
if self.running_against_diff:
self.guide.add_diff_ranges(self.parsed_diff)
def make_file_checker_manager(self):
# type: () -> None
def make_file_checker_manager(self) -> None:
"""Initialize our FileChecker Manager."""
self.file_checker_manager = checker.Manager(
style_guide=self.guide,
@ -244,8 +242,7 @@ class Application(object):
checker_plugins=self.check_plugins,
)
def run_checks(self, files=None):
# type: (Optional[List[str]]) -> None
def run_checks(self, files: Optional[List[str]] = None) -> None:
"""Run the actual checks with the FileChecker Manager.
This method encapsulates the logic to make a
@ -278,15 +275,14 @@ class Application(object):
add_statistic = statistics.append
for statistic in defaults.STATISTIC_NAMES + ("files",):
value = self.file_checker_manager.statistics[statistic]
total_description = "total " + statistic + " processed"
total_description = f"total {statistic} processed"
add_statistic((total_description, value))
per_second_description = statistic + " processed per second"
per_second_description = f"{statistic} processed per second"
add_statistic((per_second_description, int(value / time_elapsed)))
self.formatter.show_benchmarks(statistics)
def report_errors(self):
# type: () -> None
def report_errors(self) -> None:
"""Report all the errors found by flake8 3.0.
This also updates the :attr:`result_count` attribute with the total
@ -308,8 +304,7 @@ class Application(object):
self.formatter.show_statistics(self.guide.stats)
def initialize(self, argv):
# type: (List[str]) -> None
def initialize(self, argv: List[str]) -> None:
"""Initialize the application to be run.
This finds the plugins, registers their options, and parses the
@ -343,14 +338,12 @@ class Application(object):
self.report_benchmarks()
self.formatter.stop()
def _run(self, argv):
# type: (List[str]) -> None
def _run(self, argv: List[str]) -> None:
self.initialize(argv)
self.run_checks()
self.report()
def run(self, argv):
# type: (List[str]) -> None
def run(self, argv: List[str]) -> None:
"""Run our application.
This method will also handle KeyboardInterrupt exceptions for the

View file

@ -1,12 +1,12 @@
"""Command-line implementation of flake8."""
import sys
from typing import List, Optional
from typing import List
from typing import Optional
from flake8.main import application
def main(argv=None):
# type: (Optional[List[str]]) -> None
def main(argv: Optional[List[str]] = None) -> None:
"""Execute the main bit of the application.
This handles the creation of an instance of :class:`Application`, runs it,

View file

@ -1,10 +1,9 @@
"""Module containing the logic for our debugging logic."""
from __future__ import print_function
import argparse
import json
import platform
from typing import Dict, List
from typing import Dict
from typing import List
class DebugAction(argparse.Action):
@ -17,7 +16,7 @@ class DebugAction(argparse.Action):
used to delay response.
"""
self._option_manager = kwargs.pop("option_manager")
super(DebugAction, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
"""Perform the argparse action for printing debug information."""
@ -60,6 +59,6 @@ def plugins_from(option_manager):
]
def dependencies(): # type: () -> List[Dict[str, str]]
def dependencies() -> List[Dict[str, str]]:
"""Generate the list of dependencies we care about."""
return []

View file

@ -6,8 +6,7 @@ from flake8 import defaults
from flake8.main import debug
def register_preliminary_options(parser):
# type: (argparse.ArgumentParser) -> None
def register_preliminary_options(parser: argparse.ArgumentParser) -> None:
"""Register the preliminary options on our OptionManager.
The preliminary options include:
@ -64,7 +63,7 @@ def register_preliminary_options(parser):
class JobsArgument:
"""Type callback for the --jobs argument."""
def __init__(self, arg): # type: (str) -> None
def __init__(self, arg: str) -> None:
"""Parse and validate the --jobs argument.
:param str arg:
@ -78,7 +77,7 @@ class JobsArgument:
self.n_jobs = int(arg)
else:
raise argparse.ArgumentTypeError(
"{!r} must be 'auto' or an integer.".format(arg),
f"{arg!r} must be 'auto' or an integer.",
)
def __str__(self):

View file

@ -5,7 +5,8 @@ applies the user-specified command-line configuration on top of it.
"""
import argparse
import logging
from typing import List, Tuple
from typing import List
from typing import Tuple
from flake8.options import config
from flake8.options.manager import OptionManager
@ -14,10 +15,10 @@ LOG = logging.getLogger(__name__)
def aggregate_options(
manager, # type: OptionManager
config_finder, # type: config.ConfigFileFinder
argv, # type: List[str]
): # type: (...) -> Tuple[argparse.Namespace, List[str]]
manager: OptionManager,
config_finder: config.ConfigFileFinder,
argv: List[str],
) -> Tuple[argparse.Namespace, List[str]]:
"""Aggregate and merge CLI and config file options.
:param flake8.options.manager.OptionManager manager:

View file

@ -3,7 +3,9 @@ import collections
import configparser
import logging
import os.path
from typing import List, Optional, Tuple
from typing import List
from typing import Optional
from typing import Tuple
from flake8 import utils
@ -12,17 +14,16 @@ LOG = logging.getLogger(__name__)
__all__ = ("ConfigFileFinder", "MergedConfigParser")
class ConfigFileFinder(object):
class ConfigFileFinder:
"""Encapsulate the logic for finding and reading config files."""
def __init__(
self,
program_name,
extra_config_files=None,
config_file=None,
ignore_config_files=False,
):
# type: (str, Optional[List[str]], Optional[str], bool) -> None
program_name: str,
extra_config_files: Optional[List[str]] = None,
config_file: Optional[str] = None,
ignore_config_files: bool = False,
) -> None:
"""Initialize object to find config files.
:param str program_name:
@ -50,16 +51,15 @@ class ConfigFileFinder(object):
self.user_config_file = self._user_config_file(program_name)
# List of filenames to find in the local/project directory
self.project_filenames = ("setup.cfg", "tox.ini", "." + program_name)
self.project_filenames = ("setup.cfg", "tox.ini", f".{program_name}")
self.local_directory = os.path.abspath(os.curdir)
@staticmethod
def _user_config_file(program_name):
# type: (str) -> str
def _user_config_file(program_name: str) -> str:
if utils.is_windows():
home_dir = os.path.expanduser("~")
config_file_basename = "." + program_name
config_file_basename = f".{program_name}"
else:
home_dir = os.environ.get(
"XDG_CONFIG_HOME", os.path.expanduser("~/.config")
@ -69,8 +69,9 @@ class ConfigFileFinder(object):
return os.path.join(home_dir, config_file_basename)
@staticmethod
def _read_config(*files):
# type: (*str) -> Tuple[configparser.RawConfigParser, List[str]]
def _read_config(
*files: str,
) -> Tuple[configparser.RawConfigParser, List[str]]:
config = configparser.RawConfigParser()
found_files = []
@ -91,8 +92,7 @@ class ConfigFileFinder(object):
)
return (config, found_files)
def cli_config(self, files):
# type: (str) -> configparser.RawConfigParser
def cli_config(self, files: str) -> configparser.RawConfigParser:
"""Read and parse the config file specified on the command-line."""
config, found_files = self._read_config(files)
if found_files:
@ -154,7 +154,7 @@ class ConfigFileFinder(object):
return config
class MergedConfigParser(object):
class MergedConfigParser:
"""Encapsulate merging different types of configuration files.
This parses out the options registered that were specified in the
@ -344,7 +344,7 @@ def get_local_plugins(config_finder):
base_dirs = {os.path.dirname(cf) for cf in config_files}
section = "%s:local-plugins" % config_finder.program_name
section = f"{config_finder.program_name}:local-plugins"
for plugin_type in ["extension", "report"]:
if config.has_option(section, plugin_type):
local_plugins_string = config.get(section, plugin_type).strip()
@ -358,7 +358,7 @@ def get_local_plugins(config_finder):
raw_paths = utils.parse_comma_separated_list(
config.get(section, "paths").strip()
)
norm_paths = [] # type: List[str]
norm_paths: List[str] = []
for base_dir in base_dirs:
norm_paths.extend(
path

View file

@ -5,14 +5,25 @@ import contextlib
import enum
import functools
import logging
from typing import Any, Callable, cast, Dict, Generator, List, Mapping
from typing import Optional, Sequence, Set, Tuple, Union
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import Generator
from typing import List
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import Union
from flake8 import utils
if False: # TYPE_CHECKING
if TYPE_CHECKING:
from typing import NoReturn
from typing import Type
LOG = logging.getLogger(__name__)
@ -21,7 +32,7 @@ LOG = logging.getLogger(__name__)
_ARG = enum.Enum("_ARG", "NO")
_optparse_callable_map = {
_optparse_callable_map: Dict[str, Union[Type[Any], _ARG]] = {
"int": int,
"long": int,
"string": str,
@ -30,27 +41,25 @@ _optparse_callable_map = {
"choice": _ARG.NO,
# optparse allows this but does not document it
"str": str,
} # type: Dict[str, Union[Type[Any], _ARG]]
}
class _CallbackAction(argparse.Action):
"""Shim for optparse-style callback actions."""
def __init__(self, *args, **kwargs):
# type: (*Any, **Any) -> None
def __init__(self, *args: Any, **kwargs: Any) -> None:
self._callback = kwargs.pop("callback")
self._callback_args = kwargs.pop("callback_args", ())
self._callback_kwargs = kwargs.pop("callback_kwargs", {})
super(_CallbackAction, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def __call__(
self,
parser, # type: argparse.ArgumentParser
namespace, # type: argparse.Namespace
values, # type: Optional[Union[Sequence[str], str]]
option_string=None, # type: Optional[str]
):
# type: (...) -> None
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Optional[Union[Sequence[str], str]],
option_string: Optional[str] = None,
) -> None:
if not values:
values = None
elif isinstance(values, list) and len(values) > 1:
@ -61,23 +70,24 @@ class _CallbackAction(argparse.Action):
values,
parser,
*self._callback_args,
**self._callback_kwargs
**self._callback_kwargs,
)
def _flake8_normalize(value, *args, **kwargs):
# type: (str, *str, **bool) -> Union[str, List[str]]
def _flake8_normalize(
value: str, *args: str, **kwargs: bool
) -> Union[str, List[str]]:
comma_separated_list = kwargs.pop("comma_separated_list", False)
normalize_paths = kwargs.pop("normalize_paths", False)
if kwargs:
raise TypeError("Unexpected keyword args: {}".format(kwargs))
raise TypeError(f"Unexpected keyword args: {kwargs}")
ret = value # type: Union[str, List[str]]
if comma_separated_list and isinstance(ret, utils.string_types):
ret: Union[str, List[str]] = value
if comma_separated_list and isinstance(ret, str):
ret = utils.parse_comma_separated_list(value)
if normalize_paths:
if isinstance(ret, utils.string_types):
if isinstance(ret, str):
ret = utils.normalize_path(ret, *args)
else:
ret = utils.normalize_paths(ret, *args)
@ -85,34 +95,34 @@ def _flake8_normalize(value, *args, **kwargs):
return ret
class Option(object):
class Option:
"""Our wrapper around an argparse argument parsers to add features."""
def __init__(
self,
short_option_name=_ARG.NO, # type: Union[str, _ARG]
long_option_name=_ARG.NO, # type: Union[str, _ARG]
short_option_name: Union[str, _ARG] = _ARG.NO,
long_option_name: Union[str, _ARG] = _ARG.NO,
# Options below here are taken from the optparse.Option class
action=_ARG.NO, # type: Union[str, Type[argparse.Action], _ARG]
default=_ARG.NO, # type: Union[Any, _ARG]
type=_ARG.NO, # type: Union[str, Callable[..., Any], _ARG]
dest=_ARG.NO, # type: Union[str, _ARG]
nargs=_ARG.NO, # type: Union[int, str, _ARG]
const=_ARG.NO, # type: Union[Any, _ARG]
choices=_ARG.NO, # type: Union[Sequence[Any], _ARG]
help=_ARG.NO, # type: Union[str, _ARG]
metavar=_ARG.NO, # type: Union[str, _ARG]
action: Union[str, Type[argparse.Action], _ARG] = _ARG.NO,
default: Union[Any, _ARG] = _ARG.NO,
type: Union[str, Callable[..., Any], _ARG] = _ARG.NO,
dest: Union[str, _ARG] = _ARG.NO,
nargs: Union[int, str, _ARG] = _ARG.NO,
const: Union[Any, _ARG] = _ARG.NO,
choices: Union[Sequence[Any], _ARG] = _ARG.NO,
help: Union[str, _ARG] = _ARG.NO,
metavar: Union[str, _ARG] = _ARG.NO,
# deprecated optparse-only options
callback=_ARG.NO, # type: Union[Callable[..., Any], _ARG]
callback_args=_ARG.NO, # type: Union[Sequence[Any], _ARG]
callback_kwargs=_ARG.NO, # type: Union[Mapping[str, Any], _ARG]
callback: Union[Callable[..., Any], _ARG] = _ARG.NO,
callback_args: Union[Sequence[Any], _ARG] = _ARG.NO,
callback_kwargs: Union[Mapping[str, Any], _ARG] = _ARG.NO,
# Options below are taken from argparse.ArgumentParser.add_argument
required=_ARG.NO, # type: Union[bool, _ARG]
required: Union[bool, _ARG] = _ARG.NO,
# Options below here are specific to Flake8
parse_from_config=False, # type: bool
comma_separated_list=False, # type: bool
normalize_paths=False, # type: bool
): # type: (...) -> None
parse_from_config: bool = False,
comma_separated_list: bool = False,
normalize_paths: bool = False,
) -> None:
"""Initialize an Option instance.
The following are all passed directly through to argparse.
@ -203,7 +213,7 @@ class Option(object):
nargs = 0
# optparse -> argparse for `type`
if isinstance(type, utils.string_types):
if isinstance(type, str):
LOG.warning(
"option %s: please update from optparse string `type=` to "
"argparse callable `type=` -- this will be an error in the "
@ -240,7 +250,7 @@ class Option(object):
self.help = help
self.metavar = metavar
self.required = required
self.option_kwargs = {
self.option_kwargs: Dict[str, Union[Any, _ARG]] = {
"action": self.action,
"default": self.default,
"type": self.type,
@ -254,14 +264,14 @@ class Option(object):
"help": self.help,
"metavar": self.metavar,
"required": self.required,
} # type: Dict[str, Union[Any, _ARG]]
}
# Set our custom attributes
self.parse_from_config = parse_from_config
self.comma_separated_list = comma_separated_list
self.normalize_paths = normalize_paths
self.config_name = None # type: Optional[str]
self.config_name: Optional[str] = None
if parse_from_config:
if long_option_name is _ARG.NO:
raise ValueError(
@ -273,26 +283,23 @@ class Option(object):
self._opt = None
@property
def filtered_option_kwargs(self): # type: () -> Dict[str, Any]
def filtered_option_kwargs(self) -> Dict[str, Any]:
"""Return any actually-specified arguments."""
return {
k: v for k, v in self.option_kwargs.items() if v is not _ARG.NO
}
def __repr__(self): # type: () -> str # noqa: D105
def __repr__(self) -> str: # noqa: D105
parts = []
for arg in self.option_args:
parts.append(arg)
for k, v in self.filtered_option_kwargs.items():
parts.append("{}={!r}".format(k, v))
return "Option({})".format(", ".join(parts))
parts.append(f"{k}={v!r}")
return f"Option({', '.join(parts)})"
def normalize(self, value, *normalize_args):
# type: (Any, *str) -> Any
def normalize(self, value: Any, *normalize_args: str) -> Any:
"""Normalize the value based on the option configuration."""
if self.comma_separated_list and isinstance(
value, utils.string_types
):
if self.comma_separated_list and isinstance(value, str):
value = utils.parse_comma_separated_list(value)
if self.normalize_paths:
@ -303,8 +310,9 @@ class Option(object):
return value
def normalize_from_setuptools(self, value):
# type: (str) -> Union[int, float, complex, bool, str]
def normalize_from_setuptools(
self, value: str
) -> Union[int, float, complex, bool, str]:
"""Normalize the value received from setuptools."""
value = self.normalize(value)
if self.type is int or self.action == "count":
@ -321,13 +329,12 @@ class Option(object):
return False
return value
def to_argparse(self):
# type: () -> Tuple[List[str], Dict[str, Any]]
def to_argparse(self) -> Tuple[List[str], Dict[str, Any]]:
"""Convert a Flake8 Option to argparse ``add_argument`` arguments."""
return self.option_args, self.filtered_option_kwargs
@property
def to_optparse(self): # type: () -> NoReturn
def to_optparse(self) -> "NoReturn":
"""No longer functional."""
raise AttributeError("to_optparse: flake8 now uses argparse")
@ -337,16 +344,16 @@ PluginVersion = collections.namedtuple(
)
class OptionManager(object):
class OptionManager:
"""Manage Options and OptionParser while adding post-processing."""
def __init__(
self,
prog,
version,
usage="%(prog)s [options] file file ...",
parents=None,
): # type: (str, str, str, Optional[List[argparse.ArgumentParser]]) -> None # noqa: E501
prog: str,
version: str,
usage: str = "%(prog)s [options] file file ...",
parents: Optional[List[argparse.ArgumentParser]] = None,
) -> None: # noqa: E501
"""Initialize an instance of an OptionManager.
:param str prog:
@ -362,10 +369,10 @@ class OptionManager(object):
if parents is None:
parents = []
self.parser = argparse.ArgumentParser(
self.parser: argparse.ArgumentParser = argparse.ArgumentParser(
prog=prog, usage=usage, parents=parents
) # type: argparse.ArgumentParser
self._current_group = None # type: Optional[argparse._ArgumentGroup]
)
self._current_group: Optional[argparse._ArgumentGroup] = None
self.version_action = cast(
"argparse._VersionAction",
self.parser.add_argument(
@ -373,16 +380,16 @@ class OptionManager(object):
),
)
self.parser.add_argument("filenames", nargs="*", metavar="filename")
self.config_options_dict = {} # type: Dict[str, Option]
self.options = [] # type: List[Option]
self.config_options_dict: Dict[str, Option] = {}
self.options: List[Option] = []
self.program_name = prog
self.version = version
self.registered_plugins = set() # type: Set[PluginVersion]
self.extended_default_ignore = set() # type: Set[str]
self.extended_default_select = set() # type: Set[str]
self.registered_plugins: Set[PluginVersion] = set()
self.extended_default_ignore: Set[str] = set()
self.extended_default_select: Set[str] = set()
@contextlib.contextmanager
def group(self, name): # type: (str) -> Generator[None, None, None]
def group(self, name: str) -> Generator[None, None, None]:
"""Attach options to an argparse group during this context."""
group = self.parser.add_argument_group(name)
self._current_group, orig_group = group, self._current_group
@ -391,7 +398,7 @@ class OptionManager(object):
finally:
self._current_group = orig_group
def add_option(self, *args, **kwargs): # type: (*Any, **Any) -> None
def add_option(self, *args: Any, **kwargs: Any) -> None:
"""Create and register a new option.
See parameters for :class:`~flake8.options.manager.Option` for
@ -416,8 +423,7 @@ class OptionManager(object):
self.config_options_dict[name.replace("_", "-")] = option
LOG.debug('Registered option "%s".', option)
def remove_from_default_ignore(self, error_codes):
# type: (Sequence[str]) -> None
def remove_from_default_ignore(self, error_codes: Sequence[str]) -> None:
"""Remove specified error codes from the default ignore list.
:param list error_codes:
@ -435,8 +441,7 @@ class OptionManager(object):
error_code,
)
def extend_default_ignore(self, error_codes):
# type: (Sequence[str]) -> None
def extend_default_ignore(self, error_codes: Sequence[str]) -> None:
"""Extend the default ignore list with the error codes provided.
:param list error_codes:
@ -446,8 +451,7 @@ class OptionManager(object):
LOG.debug("Extending default ignore list with %r", error_codes)
self.extended_default_ignore.update(error_codes)
def extend_default_select(self, error_codes):
# type: (Sequence[str]) -> None
def extend_default_select(self, error_codes: Sequence[str]) -> None:
"""Extend the default select list with the error codes provided.
:param list error_codes:
@ -458,22 +462,21 @@ class OptionManager(object):
self.extended_default_select.update(error_codes)
def generate_versions(
self, format_str="%(name)s: %(version)s", join_on=", "
):
# type: (str, str) -> str
self, format_str: str = "%(name)s: %(version)s", join_on: str = ", "
) -> str:
"""Generate a comma-separated list of versions of plugins."""
return join_on.join(
format_str % plugin._asdict()
for plugin in sorted(self.registered_plugins)
)
def update_version_string(self): # type: () -> None
def update_version_string(self) -> None:
"""Update the flake8 version string."""
self.version_action.version = "{} ({}) {}".format(
self.version, self.generate_versions(), utils.get_python_version()
)
def generate_epilog(self): # type: () -> None
def generate_epilog(self) -> None:
"""Create an epilog with the version and name of each of plugin."""
plugin_version_format = "%(name)s: %(version)s"
self.parser.epilog = "Installed plugins: " + self.generate_versions(
@ -482,10 +485,9 @@ class OptionManager(object):
def parse_args(
self,
args=None, # type: Optional[List[str]]
values=None, # type: Optional[argparse.Namespace]
):
# type: (...) -> Tuple[argparse.Namespace, List[str]]
args: Optional[List[str]] = None,
values: Optional[argparse.Namespace] = None,
) -> Tuple[argparse.Namespace, List[str]]:
"""Proxy to calling the OptionParser's parse_args method."""
self.generate_epilog()
self.update_version_string()
@ -495,8 +497,9 @@ class OptionManager(object):
# TODO: refactor callers to not need this
return parsed_args, parsed_args.filenames
def parse_known_args(self, args=None):
# type: (Optional[List[str]]) -> Tuple[argparse.Namespace, List[str]]
def parse_known_args(
self, args: Optional[List[str]] = None
) -> Tuple[argparse.Namespace, List[str]]:
"""Parse only the known arguments from the argument values.
Replicate a little argparse behaviour while we're still on
@ -506,8 +509,9 @@ class OptionManager(object):
self.update_version_string()
return self.parser.parse_known_args(args)
def register_plugin(self, name, version, local=False):
# type: (str, str, bool) -> None
def register_plugin(
self, name: str, version: str, local: bool = False
) -> None:
"""Register a plugin relying on the OptionManager.
:param str name:

View file

@ -1,6 +1,10 @@
"""Plugin loading and management logic and classes."""
import logging
from typing import Any, Dict, List, Optional, Set
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from flake8 import exceptions
from flake8 import utils
@ -13,7 +17,7 @@ __all__ = ("Checkers", "Plugin", "PluginManager", "ReportFormatters")
NO_GROUP_FOUND = object()
class Plugin(object):
class Plugin:
"""Wrap an EntryPoint from setuptools and other logic."""
def __init__(self, name, entry_point, local=False):
@ -31,16 +35,16 @@ class Plugin(object):
self.name = name
self.entry_point = entry_point
self.local = local
self._plugin = None # type: Any
self._plugin: Any = None
self._parameters = None
self._parameter_names = None # type: Optional[List[str]]
self._parameter_names: Optional[List[str]] = None
self._group = None
self._plugin_name = None
self._version = None
def __repr__(self): # type: () -> str
def __repr__(self) -> str:
"""Provide an easy to read description of the current plugin."""
return 'Plugin(name="{0}", entry_point="{1}")'.format(
return 'Plugin(name="{}", entry_point="{}")'.format(
self.name, self.entry_point.value
)
@ -84,7 +88,7 @@ class Plugin(object):
return self._parameters
@property
def parameter_names(self): # type: () -> List[str]
def parameter_names(self) -> List[str]:
"""List of argument names that need to be passed to the plugin."""
if self._parameter_names is None:
self._parameter_names = list(self.parameters)
@ -100,7 +104,7 @@ class Plugin(object):
return self._plugin
@property
def version(self): # type: () -> str
def version(self) -> str:
"""Return the version of the plugin."""
version = self._version
if version is None:
@ -134,9 +138,9 @@ class Plugin(object):
self._plugin = self.entry_point.load()
if not callable(self._plugin):
msg = (
"Plugin %r is not a callable. It might be written for an"
" older version of flake8 and might not work with this"
" version" % self._plugin
f"Plugin {self._plugin!r} is not a callable. It might be "
f"written for an older version of flake8 and might not work "
f"with this version"
)
LOG.critical(msg)
raise TypeError(msg)
@ -219,11 +223,12 @@ class Plugin(object):
self.disable(optmanager)
class PluginManager(object): # pylint: disable=too-few-public-methods
class PluginManager: # pylint: disable=too-few-public-methods
"""Find and manage plugins consistently."""
def __init__(self, namespace, local_plugins=None):
# type: (str, Optional[List[str]]) -> None
def __init__(
self, namespace: str, local_plugins: Optional[List[str]] = None
) -> None:
"""Initialize the manager.
:param str namespace:
@ -232,8 +237,8 @@ class PluginManager(object): # pylint: disable=too-few-public-methods
Plugins from config (as "X = path.to:Plugin" strings).
"""
self.namespace = namespace
self.plugins = {} # type: Dict[str, Plugin]
self.names = [] # type: List[str]
self.plugins: Dict[str, Plugin] = {}
self.names: List[str] = []
self._load_local_plugins(local_plugins or [])
self._load_entrypoint_plugins()
@ -310,7 +315,7 @@ class PluginManager(object): # pylint: disable=too-few-public-methods
:rtype:
tuple
"""
plugins_seen = set() # type: Set[str]
plugins_seen: Set[str] = set()
for entry_point_name in self.names:
plugin = self.plugins[entry_point_name]
plugin_name = plugin.plugin_name
@ -342,10 +347,10 @@ def version_for(plugin):
return getattr(module, "__version__", None)
class PluginTypeManager(object):
class PluginTypeManager:
"""Parent class for most of the specific plugin types."""
namespace = None # type: str
namespace: str
def __init__(self, local_plugins=None):
"""Initialize the plugin type's manager.

View file

@ -1,16 +1,11 @@
"""Plugin built-in to Flake8 to treat pyflakes as a plugin."""
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
from typing import List
import pyflakes
import pyflakes.checker
from flake8 import utils
FLAKE8_PYFLAKES_CODES = {
"UnusedImport": "F401",
"ImportShadowedByLoopVar": "F402",
@ -69,8 +64,8 @@ class FlakesChecker(pyflakes.checker.Checker):
name = "pyflakes"
version = pyflakes.__version__
with_doctest = False
include_in_doctest = [] # type: List[str]
exclude_from_doctest = [] # type: List[str]
include_in_doctest: List[str] = []
exclude_from_doctest: List[str] = []
def __init__(self, tree, file_tokens, filename):
"""Initialize the PyFlakes plugin with an AST tree and filename."""
@ -96,7 +91,7 @@ class FlakesChecker(pyflakes.checker.Checker):
if overlaped_by:
with_doctest = True
super(FlakesChecker, self).__init__(
super().__init__(
tree,
filename=filename,
withDoctest=with_doctest,
@ -150,7 +145,7 @@ class FlakesChecker(pyflakes.checker.Checker):
if included_file == "":
continue
if not included_file.startswith((os.sep, "./", "~/")):
included_files.append("./" + included_file)
included_files.append(f"./{included_file}")
else:
included_files.append(included_file)
cls.include_in_doctest = utils.normalize_paths(included_files)
@ -160,7 +155,7 @@ class FlakesChecker(pyflakes.checker.Checker):
if excluded_file == "":
continue
if not excluded_file.startswith((os.sep, "./", "~/")):
excluded_files.append("./" + excluded_file)
excluded_files.append(f"./{excluded_file}")
else:
excluded_files.append(excluded_file)
cls.exclude_from_doctest = utils.normalize_paths(excluded_files)
@ -170,10 +165,10 @@ class FlakesChecker(pyflakes.checker.Checker):
)
if inc_exc:
raise ValueError(
'"%s" was specified in both the '
"include-in-doctest and exclude-from-doctest "
"options. You are not allowed to specify it in "
"both for doctesting." % inc_exc
f"{inc_exc!r} was specified in both the "
f"include-in-doctest and exclude-from-doctest "
f"options. You are not allowed to specify it in "
f"both for doctesting."
)
def run(self):

View file

@ -3,9 +3,13 @@ import argparse
import ast
import contextlib
import logging
import sys
import tokenize
from typing import Any, Dict, Generator, List, Optional, Tuple
from typing import Any
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple
import flake8
from flake8 import defaults
@ -25,7 +29,7 @@ _LogicalMapping = List[Tuple[int, Tuple[int, int]]]
_Logical = Tuple[List[str], List[str], _LogicalMapping]
class FileProcessor(object):
class FileProcessor:
"""Processes a file and holdes state.
This processes a file by generating tokens, logical and physical lines,
@ -56,8 +60,12 @@ class FileProcessor(object):
#: always ``False``, included for compatibility
noqa = False
def __init__(self, filename, options, lines=None):
# type: (str, argparse.Namespace, Optional[List[str]]) -> None
def __init__(
self,
filename: str,
options: argparse.Namespace,
lines: Optional[List[str]] = None,
) -> None:
"""Initialice our file processor.
:param str filename:
@ -74,13 +82,13 @@ class FileProcessor(object):
#: Number of blank lines
self.blank_lines = 0
#: Checker states for each plugin?
self._checker_states = {} # type: Dict[str, Dict[Any, Any]]
self._checker_states: Dict[str, Dict[Any, Any]] = {}
#: Current checker state
self.checker_state = {} # type: Dict[Any, Any]
self.checker_state: Dict[Any, Any] = {}
#: User provided option for hang closing
self.hang_closing = options.hang_closing
#: Character used for indentation
self.indent_char = None # type: Optional[str]
self.indent_char: Optional[str] = None
#: Current level of indentation
self.indent_level = 0
#: Number of spaces used for indentation
@ -104,19 +112,19 @@ class FileProcessor(object):
#: Previous unindented (i.e. top-level) logical line
self.previous_unindented_logical_line = ""
#: Current set of tokens
self.tokens = [] # type: List[_Token]
self.tokens: List[_Token] = []
#: Total number of lines in the file
self.total_lines = len(self.lines)
#: Verbosity level of Flake8
self.verbose = options.verbose
#: Statistics dictionary
self.statistics = {"logical lines": 0}
self._file_tokens = None # type: Optional[List[_Token]]
self._file_tokens: Optional[List[_Token]] = None
# map from line number to the line we'll search for `noqa` in
self._noqa_line_mapping = None # type: Optional[Dict[int, str]]
self._noqa_line_mapping: Optional[Dict[int, str]] = None
@property
def file_tokens(self): # type: () -> List[_Token]
def file_tokens(self) -> List[_Token]:
"""Return the complete set of tokens for a file.
Accessing this attribute *may* raise an InvalidSyntax exception.
@ -135,28 +143,28 @@ class FileProcessor(object):
return self._file_tokens
@contextlib.contextmanager
def inside_multiline(self, line_number):
# type: (int) -> Generator[None, None, None]
def inside_multiline(
self, line_number: int
) -> Generator[None, None, None]:
"""Context-manager to toggle the multiline attribute."""
self.line_number = line_number
self.multiline = True
yield
self.multiline = False
def reset_blank_before(self): # type: () -> None
def reset_blank_before(self) -> None:
"""Reset the blank_before attribute to zero."""
self.blank_before = 0
def delete_first_token(self): # type: () -> None
def delete_first_token(self) -> None:
"""Delete the first token in the list of tokens."""
del self.tokens[0]
def visited_new_blank_line(self): # type: () -> None
def visited_new_blank_line(self) -> None:
"""Note that we visited a new blank line."""
self.blank_lines += 1
def update_state(self, mapping):
# type: (_LogicalMapping) -> None
def update_state(self, mapping: _LogicalMapping) -> None:
"""Update the indent level based on the logical line mapping."""
(start_row, start_col) = mapping[0][1]
start_line = self.lines[start_row - 1]
@ -164,15 +172,14 @@ class FileProcessor(object):
if self.blank_before < self.blank_lines:
self.blank_before = self.blank_lines
def update_checker_state_for(self, plugin):
# type: (Dict[str, Any]) -> None
def update_checker_state_for(self, plugin: Dict[str, Any]) -> None:
"""Update the checker_state attribute for the plugin."""
if "checker_state" in plugin["parameters"]:
self.checker_state = self._checker_states.setdefault(
plugin["name"], {}
)
def next_logical_line(self): # type: () -> None
def next_logical_line(self) -> None:
"""Record the previous logical line.
This also resets the tokens list and the blank_lines count.
@ -185,11 +192,11 @@ class FileProcessor(object):
self.blank_lines = 0
self.tokens = []
def build_logical_line_tokens(self): # type: () -> _Logical
def build_logical_line_tokens(self) -> _Logical:
"""Build the mapping, comments, and logical line lists."""
logical = []
comments = []
mapping = [] # type: _LogicalMapping
mapping: _LogicalMapping = []
length = 0
previous_row = previous_column = None
for token_type, text, start, end, line in self.tokens:
@ -211,7 +218,7 @@ class FileProcessor(object):
if previous_text == "," or (
previous_text not in "{[(" and text not in "}])"
):
text = " " + text
text = f" {text}"
elif previous_column != start_column:
text = line[previous_column:start_column] + text
logical.append(text)
@ -220,12 +227,11 @@ class FileProcessor(object):
(previous_row, previous_column) = end
return comments, logical, mapping
def build_ast(self): # type: () -> ast.AST
def build_ast(self) -> ast.AST:
"""Build an abstract syntax tree from the list of lines."""
return ast.parse("".join(self.lines))
def build_logical_line(self):
# type: () -> Tuple[str, str, _LogicalMapping]
def build_logical_line(self) -> Tuple[str, str, _LogicalMapping]:
"""Build a logical line from the current tokens list."""
comments, logical, mapping_list = self.build_logical_line_tokens()
joined_comments = "".join(comments)
@ -233,8 +239,7 @@ class FileProcessor(object):
self.statistics["logical lines"] += 1
return joined_comments, self.logical_line, mapping_list
def split_line(self, token):
# type: (_Token) -> Generator[str, None, None]
def split_line(self, token: _Token) -> Generator[str, None, None]:
"""Split a physical line's line based on new-lines.
This also auto-increments the line number for the caller.
@ -243,8 +248,11 @@ class FileProcessor(object):
yield line
self.line_number += 1
def keyword_arguments_for(self, parameters, arguments=None):
# type: (Dict[str, bool], Optional[Dict[str, Any]]) -> Dict[str, Any]
def keyword_arguments_for(
self,
parameters: Dict[str, bool],
arguments: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Generate the keyword arguments for a list of parameters."""
if arguments is None:
arguments = {}
@ -265,7 +273,7 @@ class FileProcessor(object):
)
return arguments
def generate_tokens(self): # type: () -> Generator[_Token, None, None]
def generate_tokens(self) -> Generator[_Token, None, None]:
"""Tokenize the file and yield the tokens.
:raises flake8.exceptions.InvalidSyntax:
@ -281,13 +289,14 @@ class FileProcessor(object):
except (tokenize.TokenError, SyntaxError) as exc:
raise exceptions.InvalidSyntax(exception=exc)
def _noqa_line_range(self, min_line, max_line):
# type: (int, int) -> Dict[int, str]
def _noqa_line_range(
self, min_line: int, max_line: int
) -> Dict[int, str]:
line_range = range(min_line, max_line + 1)
joined = "".join(self.lines[min_line - 1 : max_line])
return dict.fromkeys(line_range, joined)
def noqa_line_for(self, line_number): # type: (int) -> Optional[str]
def noqa_line_for(self, line_number: int) -> Optional[str]:
"""Retrieve the line which will be used to determine noqa."""
if self._noqa_line_mapping is None:
try:
@ -327,7 +336,7 @@ class FileProcessor(object):
# retrieve a physical line (since none exist).
return self._noqa_line_mapping.get(line_number)
def next_line(self): # type: () -> str
def next_line(self) -> str:
"""Get the next line from the list."""
if self.line_number >= self.total_lines:
return ""
@ -337,8 +346,7 @@ class FileProcessor(object):
self.indent_char = line[0]
return line
def read_lines(self):
# type: () -> List[str]
def read_lines(self) -> List[str]:
"""Read the lines for this file checker."""
if self.filename is None or self.filename == "-":
self.filename = self.options.stdin_display_name or "stdin"
@ -347,13 +355,8 @@ class FileProcessor(object):
lines = self.read_lines_from_filename()
return lines
def _readlines_py2(self):
# type: () -> List[str]
with open(self.filename, "rU") as fd:
return fd.readlines()
def _readlines_py3(self):
# type: () -> List[str]
def read_lines_from_filename(self) -> List[str]:
"""Read the lines for a file."""
try:
with tokenize.open(self.filename) as fd:
return fd.readlines()
@ -363,22 +366,11 @@ class FileProcessor(object):
with open(self.filename, encoding="latin-1") as fd:
return fd.readlines()
def read_lines_from_filename(self):
# type: () -> List[str]
"""Read the lines for a file."""
if (2, 6) <= sys.version_info < (3, 0):
readlines = self._readlines_py2
elif (3, 0) <= sys.version_info < (4, 0):
readlines = self._readlines_py3
return readlines()
def read_lines_from_stdin(self):
# type: () -> List[str]
def read_lines_from_stdin(self) -> List[str]:
"""Read the lines from standard in."""
return utils.stdin_get_lines()
def should_ignore_file(self):
# type: () -> bool
def should_ignore_file(self) -> bool:
"""Check if ``flake8: noqa`` is in the file to be ignored.
:returns:
@ -400,8 +392,7 @@ class FileProcessor(object):
else:
return False
def strip_utf_bom(self):
# type: () -> None
def strip_utf_bom(self) -> None:
"""Strip the UTF bom from the lines of the file."""
if not self.lines:
# If we have nothing to analyze quit early
@ -418,23 +409,22 @@ class FileProcessor(object):
self.lines[0] = self.lines[0][3:]
def is_eol_token(token): # type: (_Token) -> bool
def is_eol_token(token: _Token) -> bool:
"""Check if the token is an end-of-line token."""
return token[0] in NEWLINE or token[4][token[3][1] :].lstrip() == "\\\n"
def is_multiline_string(token): # type: (_Token) -> bool
def is_multiline_string(token: _Token) -> bool:
"""Check if this is a multiline string."""
return token[0] == tokenize.STRING and "\n" in token[1]
def token_is_newline(token): # type: (_Token) -> bool
def token_is_newline(token: _Token) -> bool:
"""Check if the token type is a newline token type."""
return token[0] in NEWLINE
def count_parentheses(current_parentheses_count, token_text):
# type: (int, str) -> int
def count_parentheses(current_parentheses_count: int, token_text: str) -> int:
"""Count the number of parentheses."""
if token_text in "([{": # nosec
return current_parentheses_count + 1
@ -443,12 +433,12 @@ def count_parentheses(current_parentheses_count, token_text):
return current_parentheses_count
def log_token(log, token): # type: (logging.Logger, _Token) -> None
def log_token(log: logging.Logger, token: _Token) -> None:
"""Log a token to a provided logging object."""
if token[2][0] == token[3][0]:
pos = "[%s:%s]" % (token[2][1] or "", token[3][1])
pos = "[{}:{}]".format(token[2][1] or "", token[3][1])
else:
pos = "l.%s" % token[3][0]
pos = f"l.{token[3][0]}"
log.log(
flake8._EXTRA_VERBOSE,
"l.%s\t%s\t%s\t%r"
@ -458,7 +448,7 @@ def log_token(log, token): # type: (logging.Logger, _Token) -> None
# NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle
def expand_indent(line): # type: (str) -> int
def expand_indent(line: str) -> int:
r"""Return the amount of indentation.
Tabs are expanded to the next multiple of 8.
@ -488,7 +478,7 @@ def expand_indent(line): # type: (str) -> int
# NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be
# more descriptive.
def mutate_string(text): # type: (str) -> str
def mutate_string(text: str) -> str:
"""Replace contents with 'xxx' to prevent syntax matching.
>>> mutate_string('"abc"')

View file

@ -1,19 +1,23 @@
"""Statistic collection logic for Flake8."""
import collections
from typing import Dict, Generator, List, Optional
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
if False: # `typing.TYPE_CHECKING` was introduced in 3.5.2
if TYPE_CHECKING:
from flake8.style_guide import Violation
class Statistics(object):
class Statistics:
"""Manager of aggregated statistics for a run of Flake8."""
def __init__(self): # type: () -> None
def __init__(self) -> None:
"""Initialize the underlying dictionary for our statistics."""
self._store = {} # type: Dict[Key, Statistic]
self._store: Dict[Key, "Statistic"] = {}
def error_codes(self): # type: () -> List[str]
def error_codes(self) -> List[str]:
"""Return all unique error codes stored.
:returns:
@ -23,7 +27,7 @@ class Statistics(object):
"""
return sorted({key.code for key in self._store})
def record(self, error): # type: (Violation) -> None
def record(self, error: "Violation") -> None:
"""Add the fact that the error was seen in the file.
:param error:
@ -37,8 +41,9 @@ class Statistics(object):
self._store[key] = Statistic.create_from(error)
self._store[key].increment()
def statistics_for(self, prefix, filename=None):
# type: (str, Optional[str]) -> Generator[Statistic, None, None]
def statistics_for(
self, prefix: str, filename: Optional[str] = None
) -> Generator["Statistic", None, None]:
"""Generate statistics for the prefix and filename.
If you have a :class:`Statistics` object that has recorded errors,
@ -79,11 +84,11 @@ class Key(collections.namedtuple("Key", ["filename", "code"])):
__slots__ = ()
@classmethod
def create_from(cls, error): # type: (Violation) -> Key
def create_from(cls, error: "Violation") -> "Key":
"""Create a Key from :class:`flake8.style_guide.Violation`."""
return cls(filename=error.filename, code=error.code)
def matches(self, prefix, filename): # type: (str, Optional[str]) -> bool
def matches(self, prefix: str, filename: Optional[str]) -> bool:
"""Determine if this key matches some constraints.
:param str prefix:
@ -102,7 +107,7 @@ class Key(collections.namedtuple("Key", ["filename", "code"])):
)
class Statistic(object):
class Statistic:
"""Simple wrapper around the logic of each statistic.
Instead of maintaining a simple but potentially hard to reason about
@ -110,8 +115,9 @@ class Statistic(object):
convenience methods on it.
"""
def __init__(self, error_code, filename, message, count):
# type: (str, str, str, int) -> None
def __init__(
self, error_code: str, filename: str, message: str, count: int
) -> None:
"""Initialize our Statistic."""
self.error_code = error_code
self.filename = filename
@ -119,7 +125,7 @@ class Statistic(object):
self.count = count
@classmethod
def create_from(cls, error): # type: (Violation) -> Statistic
def create_from(cls, error: "Violation") -> "Statistic":
"""Create a Statistic from a :class:`flake8.style_guide.Violation`."""
return cls(
error_code=error.code,
@ -128,6 +134,6 @@ class Statistic(object):
count=0,
)
def increment(self): # type: () -> None
def increment(self) -> None:
"""Increment the number of times we've seen this error in this file."""
self.count += 1

View file

@ -4,16 +4,23 @@ import collections
import contextlib
import copy
import enum
import functools
import itertools
import linecache
import logging
from typing import Dict, Generator, List, Match, Optional, Sequence, Set
from typing import Tuple, Union
from typing import Dict
from typing import Generator
from typing import List
from typing import Match
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
from flake8 import defaults
from flake8 import statistics
from flake8 import utils
from flake8._compat import lru_cache
from flake8.formatting import base as base_formatter
__all__ = ("StyleGuide",)
@ -42,8 +49,8 @@ class Decision(enum.Enum):
Selected = "selected error"
@lru_cache(maxsize=512)
def find_noqa(physical_line): # type: (str) -> Optional[Match[str]]
@functools.lru_cache(maxsize=512)
def find_noqa(physical_line: str) -> Optional[Match[str]]:
return defaults.NOQA_INLINE_REGEXP.search(physical_line)
@ -63,8 +70,7 @@ _Violation = collections.namedtuple(
class Violation(_Violation):
"""Class representing a violation reported by Flake8."""
def is_inline_ignored(self, disable_noqa):
# type: (bool) -> bool
def is_inline_ignored(self, disable_noqa: bool) -> bool:
"""Determine if a comment has been added to ignore this line.
:param bool disable_noqa:
@ -105,8 +111,7 @@ class Violation(_Violation):
)
return False
def is_in(self, diff):
# type: (Dict[str, Set[int]]) -> bool
def is_in(self, diff: Dict[str, Set[int]]) -> bool:
"""Determine if the violation is included in a diff's line ranges.
This function relies on the parsed data added via
@ -142,16 +147,16 @@ class Violation(_Violation):
return self.line_number in line_numbers
class DecisionEngine(object):
class DecisionEngine:
"""A class for managing the decision process around violations.
This contains the logic for whether a violation should be reported or
ignored.
"""
def __init__(self, options): # type: (argparse.Namespace) -> None
def __init__(self, options: argparse.Namespace) -> None:
"""Initialize the engine."""
self.cache = {} # type: Dict[str, Decision]
self.cache: Dict[str, Decision] = {}
self.selected = tuple(options.select)
self.extended_selected = tuple(
sorted(options.extended_default_select, reverse=True)
@ -169,16 +174,15 @@ class DecisionEngine(object):
self.using_default_ignore = set(self.ignored) == set(defaults.IGNORE)
self.using_default_select = set(self.selected) == set(defaults.SELECT)
def _in_all_selected(self, code): # type: (str) -> bool
def _in_all_selected(self, code: str) -> bool:
return bool(self.all_selected) and code.startswith(self.all_selected)
def _in_extended_selected(self, code): # type: (str) -> bool
def _in_extended_selected(self, code: str) -> bool:
return bool(self.extended_selected) and code.startswith(
self.extended_selected
)
def was_selected(self, code):
# type: (str) -> Union[Selected, Ignored]
def was_selected(self, code: str) -> Union[Selected, Ignored]:
"""Determine if the code has been selected by the user.
:param str code:
@ -201,8 +205,7 @@ class DecisionEngine(object):
return Ignored.Implicitly
def was_ignored(self, code):
# type: (str) -> Union[Selected, Ignored]
def was_ignored(self, code: str) -> Union[Selected, Ignored]:
"""Determine if the code has been ignored by the user.
:param str code:
@ -219,8 +222,7 @@ class DecisionEngine(object):
return Selected.Implicitly
def more_specific_decision_for(self, code):
# type: (str) -> Decision
def more_specific_decision_for(self, code: str) -> Decision:
select = find_first_match(code, self.all_selected)
extra_select = find_first_match(code, self.extended_selected)
ignore = find_first_match(code, self.ignored)
@ -268,8 +270,7 @@ class DecisionEngine(object):
return Decision.Ignored
return Decision.Selected
def make_decision(self, code):
# type: (str) -> Decision
def make_decision(self, code: str) -> Decision:
"""Decide if code should be ignored or selected."""
LOG.debug('Deciding if "%s" should be reported', code)
selected = self.was_selected(code)
@ -295,8 +296,7 @@ class DecisionEngine(object):
decision = Decision.Ignored # pylint: disable=R0204
return decision
def decision_for(self, code):
# type: (str) -> Decision
def decision_for(self, code: str) -> Decision:
"""Return the decision for a specific code.
This method caches the decisions for codes to avoid retracing the same
@ -318,15 +318,15 @@ class DecisionEngine(object):
return decision
class StyleGuideManager(object):
class StyleGuideManager:
"""Manage multiple style guides for a single run."""
def __init__(
self,
options, # type: argparse.Namespace
formatter, # type: base_formatter.BaseFormatter
decider=None, # type: Optional[DecisionEngine]
): # type: (...) -> None
options: argparse.Namespace,
formatter: base_formatter.BaseFormatter,
decider: Optional[DecisionEngine] = None,
) -> None:
"""Initialize our StyleGuide.
.. todo:: Add parameter documentation.
@ -335,7 +335,7 @@ class StyleGuideManager(object):
self.formatter = formatter
self.stats = statistics.Statistics()
self.decider = decider or DecisionEngine(options)
self.style_guides = [] # type: List[StyleGuide]
self.style_guides: List[StyleGuide] = []
self.default_style_guide = StyleGuide(
options, formatter, self.stats, decider=decider
)
@ -346,8 +346,9 @@ class StyleGuideManager(object):
)
)
def populate_style_guides_with(self, options):
# type: (argparse.Namespace) -> Generator[StyleGuide, None, None]
def populate_style_guides_with(
self, options: argparse.Namespace
) -> Generator["StyleGuide", None, None]:
"""Generate style guides from the per-file-ignores option.
:param options:
@ -367,8 +368,8 @@ class StyleGuideManager(object):
filename=filename, extend_ignore_with=violations
)
@lru_cache(maxsize=None)
def style_guide_for(self, filename): # type: (str) -> StyleGuide
@functools.lru_cache(maxsize=None)
def style_guide_for(self, filename: str) -> "StyleGuide":
"""Find the StyleGuide for the filename in particular."""
guides = sorted(
(g for g in self.style_guides if g.applies_to(filename)),
@ -379,8 +380,9 @@ class StyleGuideManager(object):
return guides[0]
@contextlib.contextmanager
def processing_file(self, filename):
# type: (str) -> Generator[StyleGuide, None, None]
def processing_file(
self, filename: str
) -> Generator["StyleGuide", None, None]:
"""Record the fact that we're processing the file's results."""
guide = self.style_guide_for(filename)
with guide.processing_file(filename):
@ -388,14 +390,13 @@ class StyleGuideManager(object):
def handle_error(
self,
code,
filename,
line_number,
column_number,
text,
physical_line=None,
):
# type: (str, str, int, Optional[int], str, Optional[str]) -> int
code: str,
filename: str,
line_number: int,
column_number: Optional[int],
text: str,
physical_line: Optional[str] = None,
) -> int:
"""Handle an error reported by a check.
:param str code:
@ -423,8 +424,7 @@ class StyleGuideManager(object):
code, filename, line_number, column_number, text, physical_line
)
def add_diff_ranges(self, diffinfo):
# type: (Dict[str, Set[int]]) -> None
def add_diff_ranges(self, diffinfo: Dict[str, Set[int]]) -> None:
"""Update the StyleGuides to filter out information not in the diff.
This provides information to the underlying StyleGuides so that only
@ -437,16 +437,16 @@ class StyleGuideManager(object):
guide.add_diff_ranges(diffinfo)
class StyleGuide(object):
class StyleGuide:
"""Manage a Flake8 user's style guide."""
def __init__(
self,
options, # type: argparse.Namespace
formatter, # type: base_formatter.BaseFormatter
stats, # type: statistics.Statistics
filename=None, # type: Optional[str]
decider=None, # type: Optional[DecisionEngine]
options: argparse.Namespace,
formatter: base_formatter.BaseFormatter,
stats: statistics.Statistics,
filename: Optional[str] = None,
decider: Optional[DecisionEngine] = None,
):
"""Initialize our StyleGuide.
@ -459,14 +459,17 @@ class StyleGuide(object):
self.filename = filename
if self.filename:
self.filename = utils.normalize_path(self.filename)
self._parsed_diff = {} # type: Dict[str, Set[int]]
self._parsed_diff: Dict[str, Set[int]] = {}
def __repr__(self): # type: () -> str
def __repr__(self) -> str:
"""Make it easier to debug which StyleGuide we're using."""
return "<StyleGuide [{}]>".format(self.filename)
return f"<StyleGuide [{self.filename}]>"
def copy(self, filename=None, extend_ignore_with=None):
# type: (Optional[str], Optional[Sequence[str]]) -> StyleGuide
def copy(
self,
filename: Optional[str] = None,
extend_ignore_with: Optional[Sequence[str]] = None,
) -> "StyleGuide":
"""Create a copy of this style guide with different values."""
filename = filename or self.filename
options = copy.deepcopy(self.options)
@ -476,14 +479,15 @@ class StyleGuide(object):
)
@contextlib.contextmanager
def processing_file(self, filename):
# type: (str) -> Generator[StyleGuide, None, None]
def processing_file(
self, filename: str
) -> Generator["StyleGuide", None, None]:
"""Record the fact that we're processing the file's results."""
self.formatter.beginning(filename)
yield self
self.formatter.finished(filename)
def applies_to(self, filename): # type: (str) -> bool
def applies_to(self, filename: str) -> bool:
"""Check if this StyleGuide applies to the file.
:param str filename:
@ -499,12 +503,11 @@ class StyleGuide(object):
return utils.matches_filename(
filename,
patterns=[self.filename],
log_message='{!r} does %(whether)smatch "%(path)s"'.format(self),
log_message=f'{self!r} does %(whether)smatch "%(path)s"',
logger=LOG,
)
def should_report_error(self, code):
# type: (str) -> Decision
def should_report_error(self, code: str) -> Decision:
"""Determine if the error code should be reported or ignored.
This method only cares about the select and ignore rules as specified
@ -520,14 +523,13 @@ class StyleGuide(object):
def handle_error(
self,
code,
filename,
line_number,
column_number,
text,
physical_line=None,
):
# type: (str, str, int, Optional[int], str, Optional[str]) -> int
code: str,
filename: str,
line_number: int,
column_number: Optional[int],
text: str,
physical_line: Optional[str] = None,
) -> int:
"""Handle an error reported by a check.
:param str code:
@ -579,8 +581,7 @@ class StyleGuide(object):
return 1
return 0
def add_diff_ranges(self, diffinfo):
# type: (Dict[str, Set[int]]) -> None
def add_diff_ranges(self, diffinfo: Dict[str, Set[int]]) -> None:
"""Update the StyleGuide to filter out information not in the diff.
This provides information to the StyleGuide so that only the errors
@ -592,14 +593,15 @@ class StyleGuide(object):
self._parsed_diff = diffinfo
def find_more_specific(selected, ignored): # type: (str, str) -> Decision
def find_more_specific(selected: str, ignored: str) -> Decision:
if selected.startswith(ignored) and selected != ignored:
return Decision.Selected
return Decision.Ignored
def find_first_match(error_code, code_list):
# type: (str, Tuple[str, ...]) -> Optional[str]
def find_first_match(
error_code: str, code_list: Tuple[str, ...]
) -> Optional[str]:
startswith = error_code.startswith
for code in code_list:
if startswith(code):

View file

@ -1,6 +1,7 @@
"""Utility methods for flake8."""
import collections
import fnmatch as _fnmatch
import functools
import inspect
import io
import logging
@ -8,24 +9,33 @@ import os
import platform
import re
import sys
import textwrap
import tokenize
from typing import Callable, Dict, Generator, List, Optional, Pattern
from typing import Sequence, Set, Tuple, Union
from typing import Callable
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Pattern
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union
from flake8 import exceptions
from flake8._compat import lru_cache
if False: # `typing.TYPE_CHECKING` was introduced in 3.5.2
if TYPE_CHECKING:
from flake8.plugins.manager import Plugin
DIFF_HUNK_REGEXP = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$")
COMMA_SEPARATED_LIST_RE = re.compile(r"[,\s]")
LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]")
string_types = (str, type(u""))
def parse_comma_separated_list(value, regexp=COMMA_SEPARATED_LIST_RE):
# type: (str, Pattern[str]) -> List[str]
def parse_comma_separated_list(
value: str, regexp: Pattern[str] = COMMA_SEPARATED_LIST_RE
) -> List[str]:
"""Parse a comma-separated list.
:param value:
@ -40,7 +50,7 @@ def parse_comma_separated_list(value, regexp=COMMA_SEPARATED_LIST_RE):
:rtype:
list
"""
assert isinstance(value, string_types), value
assert isinstance(value, str), value
separated = regexp.split(value)
item_gen = (item.strip() for item in separated)
@ -59,8 +69,7 @@ _FILE_LIST_TOKEN_TYPES = [
]
def _tokenize_files_to_codes_mapping(value):
# type: (str) -> List[_Token]
def _tokenize_files_to_codes_mapping(value: str) -> List[_Token]:
tokens = []
i = 0
while i < len(value):
@ -77,8 +86,9 @@ def _tokenize_files_to_codes_mapping(value):
return tokens
def parse_files_to_codes_mapping(value_): # noqa: C901
# type: (Union[Sequence[str], str]) -> List[Tuple[str, List[str]]]
def parse_files_to_codes_mapping( # noqa: C901
value_: Union[Sequence[str], str]
) -> List[Tuple[str, List[str]]]:
"""Parse a files-to-codes mapping.
A files-to-codes mapping a sequence of values specified as
@ -88,22 +98,22 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
:param value: String to be parsed and normalized.
:type value: str
"""
if not isinstance(value_, string_types):
if not isinstance(value_, str):
value = "\n".join(value_)
else:
value = value_
ret = [] # type: List[Tuple[str, List[str]]]
ret: List[Tuple[str, List[str]]] = []
if not value.strip():
return ret
class State:
seen_sep = True
seen_colon = False
filenames = [] # type: List[str]
codes = [] # type: List[str]
filenames: List[str] = []
codes: List[str] = []
def _reset(): # type: () -> None
def _reset() -> None:
if State.codes:
for filename in State.filenames:
ret.append((filename, State.codes))
@ -112,16 +122,12 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
State.filenames = []
State.codes = []
def _unexpected_token(): # type: () -> exceptions.ExecutionError
def _indent(s): # type: (str) -> str
return " " + s.strip().replace("\n", "\n ")
def _unexpected_token() -> exceptions.ExecutionError:
return exceptions.ExecutionError(
"Expected `per-file-ignores` to be a mapping from file exclude "
"patterns to ignore codes.\n\n"
"Configured `per-file-ignores` setting:\n\n{}".format(
_indent(value)
)
f"Expected `per-file-ignores` to be a mapping from file exclude "
f"patterns to ignore codes.\n\n"
f"Configured `per-file-ignores` setting:\n\n"
f"{textwrap.indent(value.strip(), ' ')}"
)
for token in _tokenize_files_to_codes_mapping(value):
@ -155,8 +161,9 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
return ret
def normalize_paths(paths, parent=os.curdir):
# type: (Sequence[str], str) -> List[str]
def normalize_paths(
paths: Sequence[str], parent: str = os.curdir
) -> List[str]:
"""Normalize a list of paths relative to a parent directory.
:returns:
@ -168,8 +175,7 @@ def normalize_paths(paths, parent=os.curdir):
return [normalize_path(p, parent) for p in paths]
def normalize_path(path, parent=os.curdir):
# type: (str, str) -> str
def normalize_path(path: str, parent: str = os.curdir) -> str:
"""Normalize a single-path.
:returns:
@ -190,7 +196,9 @@ def normalize_path(path, parent=os.curdir):
return path.rstrip(separator + alternate_separator)
def _stdin_get_value_py3(): # type: () -> str
@functools.lru_cache(maxsize=1)
def stdin_get_value() -> str:
"""Get and cache it so plugins can use it."""
stdin_value = sys.stdin.buffer.read()
fd = io.BytesIO(stdin_value)
try:
@ -201,25 +209,12 @@ def _stdin_get_value_py3(): # type: () -> str
return stdin_value.decode("utf-8")
@lru_cache(maxsize=1)
def stdin_get_value(): # type: () -> str
"""Get and cache it so plugins can use it."""
if sys.version_info < (3,):
return sys.stdin.read()
else:
return _stdin_get_value_py3()
def stdin_get_lines(): # type: () -> List[str]
def stdin_get_lines() -> List[str]:
"""Return lines of stdin split according to file splitting."""
if sys.version_info < (3,):
return list(io.BytesIO(stdin_get_value()))
else:
return list(io.StringIO(stdin_get_value()))
return list(io.StringIO(stdin_get_value()))
def parse_unified_diff(diff=None):
# type: (Optional[str]) -> Dict[str, Set[int]]
def parse_unified_diff(diff: Optional[str] = None) -> Dict[str, Set[int]]:
"""Parse the unified diff passed on stdin.
:returns:
@ -233,7 +228,7 @@ def parse_unified_diff(diff=None):
number_of_rows = None
current_path = None
parsed_paths = collections.defaultdict(set) # type: Dict[str, Set[int]]
parsed_paths: Dict[str, Set[int]] = collections.defaultdict(set)
for line in diff.splitlines():
if number_of_rows:
# NOTE(sigmavirus24): Below we use a slice because stdin may be
@ -291,8 +286,7 @@ def parse_unified_diff(diff=None):
return parsed_paths
def is_windows():
# type: () -> bool
def is_windows() -> bool:
"""Determine if we're running on Windows.
:returns:
@ -303,8 +297,7 @@ def is_windows():
return os.name == "nt"
def is_using_stdin(paths):
# type: (List[str]) -> bool
def is_using_stdin(paths: List[str]) -> bool:
"""Determine if we're going to read from stdin.
:param list paths:
@ -317,12 +310,14 @@ def is_using_stdin(paths):
return "-" in paths
def _default_predicate(*args): # type: (*str) -> bool
def _default_predicate(*args: str) -> bool:
return False
def filenames_from(arg, predicate=None):
# type: (str, Optional[Callable[[str], bool]]) -> Generator[str, None, None] # noqa: E501
def filenames_from(
arg: str, predicate: Optional[Callable[[str], bool]] = None
) -> Generator[str, None, None]:
# noqa: E501
"""Generate filenames from an argument.
:param str arg:
@ -362,8 +357,7 @@ def filenames_from(arg, predicate=None):
yield arg
def fnmatch(filename, patterns):
# type: (str, Sequence[str]) -> bool
def fnmatch(filename: str, patterns: Sequence[str]) -> bool:
"""Wrap :func:`fnmatch.fnmatch` to add some functionality.
:param str filename:
@ -381,8 +375,7 @@ def fnmatch(filename, patterns):
return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns)
def parameters_for(plugin):
# type: (Plugin) -> Dict[str, bool]
def parameters_for(plugin: "Plugin") -> Dict[str, bool]:
"""Return the parameters for the plugin.
This will inspect the plugin and return either the function parameters
@ -404,24 +397,11 @@ def parameters_for(plugin):
if is_class: # The plugin is a class
func = plugin.plugin.__init__
if sys.version_info < (3, 3):
argspec = inspect.getargspec(func)
start_of_optional_args = len(argspec[0]) - len(argspec[-1] or [])
parameter_names = argspec[0]
parameters = collections.OrderedDict(
[
(name, position < start_of_optional_args)
for position, name in enumerate(parameter_names)
]
)
else:
parameters = collections.OrderedDict(
[
(parameter.name, parameter.default is parameter.empty)
for parameter in inspect.signature(func).parameters.values()
if parameter.kind == parameter.POSITIONAL_OR_KEYWORD
]
)
parameters = {
parameter.name: parameter.default is parameter.empty
for parameter in inspect.signature(func).parameters.values()
if parameter.kind == parameter.POSITIONAL_OR_KEYWORD
}
if is_class:
parameters.pop("self", None)
@ -429,8 +409,12 @@ def parameters_for(plugin):
return parameters
def matches_filename(path, patterns, log_message, logger):
# type: (str, Sequence[str], str, logging.Logger) -> bool
def matches_filename(
path: str,
patterns: Sequence[str],
log_message: str,
logger: logging.Logger,
) -> bool:
"""Use fnmatch to discern if a path exists in patterns.
:param str path:
@ -462,7 +446,7 @@ def matches_filename(path, patterns, log_message, logger):
return match
def get_python_version(): # type: () -> str
def get_python_version() -> str:
"""Find and format the python implementation and version.
:returns:
@ -470,7 +454,7 @@ def get_python_version(): # type: () -> str
:rtype:
str
"""
return "%s %s on %s" % (
return "{} {} on {}".format(
platform.python_implementation(),
platform.python_version(),
platform.system(),

View file

@ -1,3 +1,4 @@
from some.module.that.has.nested.sub.modules import ClassWithVeryVeryVeryVeryLongName # noqa: E501,F401
from some.module.that.has.nested.sub.modules import \
ClassWithVeryVeryVeryVeryLongName # noqa: E501,F401
# ClassWithVeryVeryVeryVeryLongName()

View file

@ -1,7 +1,7 @@
"""Module that is off sys.path by default, for testing local-plugin-paths."""
class ExtensionTestPlugin2(object):
class ExtensionTestPlugin2:
"""Extension test plugin in its own directory."""
name = 'ExtensionTestPlugin2'

View file

@ -1,5 +1,6 @@
"""Integration tests for the checker submodule."""
import mock
from unittest import mock
import pytest
from flake8 import checker
@ -20,7 +21,7 @@ EXPECTED_RESULT_PHYSICAL_LINE = (
)
class PluginClass(object):
class PluginClass:
"""Simple file plugin class yielding the expected report."""
name = 'test'

View file

@ -1,8 +1,8 @@
"""Integration tests for the main entrypoint of flake8."""
import json
import os
from unittest import mock
import mock
import pytest
from flake8 import utils
@ -280,13 +280,13 @@ def test_obtaining_args_from_sys_argv_when_not_explicity_provided(capsys):
def test_cli_config_option_respected(tmp_path):
"""Test --config is used."""
config = tmp_path / "flake8.ini"
config.write_text(u"""\
config.write_text("""\
[flake8]
ignore = F401
""")
py_file = tmp_path / "t.py"
py_file.write_text(u"import os\n")
py_file.write_text("import os\n")
_call_main(["--config", str(config), str(py_file)])
@ -294,13 +294,13 @@ ignore = F401
def test_cli_isolated_overrides_config_option(tmp_path):
"""Test --isolated overrides --config."""
config = tmp_path / "flake8.ini"
config.write_text(u"""\
config.write_text("""\
[flake8]
ignore = F401
""")
py_file = tmp_path / "t.py"
py_file.write_text(u"import os\n")
py_file.write_text("import os\n")
_call_main(["--isolated", "--config", str(config), str(py_file)], retv=1)

View file

@ -1,12 +1,11 @@
"""Integration tests for plugin loading."""
from flake8.main import application
LOCAL_PLUGIN_CONFIG = 'tests/fixtures/config_files/local-plugin.ini'
LOCAL_PLUGIN_PATH_CONFIG = 'tests/fixtures/config_files/local-plugin-path.ini'
class ExtensionTestPlugin(object):
class ExtensionTestPlugin:
"""Extension test plugin."""
name = 'ExtensionTestPlugin'
@ -24,7 +23,7 @@ class ExtensionTestPlugin(object):
parser.add_option('--anopt')
class ReportTestPlugin(object):
class ReportTestPlugin:
"""Report test plugin."""
name = 'ReportTestPlugin'

View file

@ -1,8 +1,8 @@
"""Tests for the Application class."""
import argparse
import sys
from unittest import mock
import mock
import pytest
from flake8.main import application as app

View file

@ -1,7 +1,7 @@
"""Tests for the BaseFormatter object."""
import argparse
from unittest import mock
import mock
import pytest
from flake8 import style_guide

View file

@ -1,7 +1,7 @@
"""Tests for the Manager object for FileCheckers."""
import errno
from unittest import mock
import mock
import pytest
from flake8 import checker

View file

@ -1,9 +1,8 @@
# -*- coding: utf-8 -*-
"""Tests for the ConfigFileFinder."""
import configparser
import os
from unittest import mock
import mock
import pytest
from flake8.options import config

View file

@ -1,5 +1,6 @@
"""Tests for our debugging module."""
import mock
from unittest import mock
import pytest
from flake8.main import debug

View file

@ -1,5 +1,6 @@
"""Unit tests for the FileChecker class."""
import mock
from unittest import mock
import pytest
import flake8
@ -50,7 +51,7 @@ def test_nonexistent_file():
def test_raises_exception_on_failed_plugin(tmp_path, default_options):
"""Checks that a failing plugin results in PluginExecutionFailed."""
foobar = tmp_path / 'foobar.py'
foobar.write_text(u"I exist!") # Create temp file
foobar.write_text("I exist!") # Create temp file
plugin = {
"name": "failure",
"plugin_name": "failure", # Both are necessary

View file

@ -1,8 +1,8 @@
"""Tests for the FileProcessor class."""
import ast
import tokenize
from unittest import mock
import mock
import pytest
from flake8 import processor
@ -46,7 +46,7 @@ def test_read_lines_unknown_encoding(tmpdir, default_options):
@pytest.mark.parametrize('first_line', [
'\xEF\xBB\xBF"""Module docstring."""\n',
u'\uFEFF"""Module docstring."""\n',
'\uFEFF"""Module docstring."""\n',
])
def test_strip_utf_bom(first_line, default_options):
r"""Verify that we strip '\xEF\xBB\xBF' from the first line."""
@ -58,7 +58,7 @@ def test_strip_utf_bom(first_line, default_options):
@pytest.mark.parametrize('lines, expected', [
(['\xEF\xBB\xBF"""Module docstring."""\n'], False),
([u'\uFEFF"""Module docstring."""\n'], False),
(['\uFEFF"""Module docstring."""\n'], False),
(['#!/usr/bin/python', '# flake8 is great', 'a = 1'], False),
(['#!/usr/bin/python', '# flake8: noqa', 'a = 1'], True),
(['#!/usr/bin/python', '# flake8:noqa', 'a = 1'], True),
@ -130,7 +130,7 @@ def test_noqa_line_for(default_options):
])
for i in range(1, 4):
assert file_processor.noqa_line_for(i) == 'Line {0}\n'.format(i)
assert file_processor.noqa_line_for(i) == f'Line {i}\n'
def test_noqa_line_for_continuation(default_options):
@ -182,7 +182,7 @@ def test_next_line(default_options):
])
for i in range(1, 4):
assert file_processor.next_line() == 'Line {}'.format(i)
assert file_processor.next_line() == f'Line {i}'
assert file_processor.line_number == i

View file

@ -1,5 +1,5 @@
"""Tests for get_local_plugins."""
import mock
from unittest import mock
from flake8.options import config

View file

@ -1,8 +1,8 @@
"""Tests for Flake8's legacy API."""
import argparse
import os.path
from unittest import mock
import mock
import pytest
from flake8.api import legacy as api

View file

@ -1,7 +1,7 @@
"""Unit tests for flake8.options.config.MergedConfigParser."""
import os
from unittest import mock
import mock
import pytest
from flake8.options import config

View file

@ -1,7 +1,7 @@
"""Unit tests for flake8.options.manager.Option."""
import functools
from unittest import mock
import mock
import pytest
from flake8.options import manager

View file

@ -1,8 +1,8 @@
"""Unit tests for flake.options.manager.OptionManager."""
import argparse
import os
from unittest import mock
import mock
import pytest
from flake8 import utils

View file

@ -1,7 +1,7 @@
"""Tests for flake8.plugins.manager.Plugin."""
import argparse
from unittest import mock
import mock
import pytest
from flake8 import exceptions

View file

@ -1,5 +1,5 @@
"""Tests for flake8.plugins.manager.PluginManager."""
import mock
from unittest import mock
from flake8._compat import importlib_metadata
from flake8.plugins import manager

View file

@ -1,5 +1,6 @@
"""Tests for flake8.plugins.manager.PluginTypeManager."""
import mock
from unittest import mock
import pytest
from flake8 import exceptions

View file

@ -111,8 +111,8 @@ def test_statistic_for_retrieves_more_than_one_value():
"""Show this works for more than a couple statistic values."""
aggregator = stats.Statistics()
for i in range(50):
aggregator.record(make_error(code='E1{:02d}'.format(i)))
aggregator.record(make_error(code='W2{:02d}'.format(i)))
aggregator.record(make_error(code=f'E1{i:02d}'))
aggregator.record(make_error(code=f'W2{i:02d}'))
statistics = list(aggregator.statistics_for('E'))
assert len(statistics) == 50

View file

@ -1,7 +1,7 @@
"""Tests for the flake8.style_guide.StyleGuide class."""
import argparse
from unittest import mock
import mock
import pytest
from flake8 import statistics

View file

@ -3,8 +3,8 @@ import io
import logging
import os
import sys
from unittest import mock
import mock
import pytest
from flake8 import exceptions
@ -134,7 +134,7 @@ def test_normalize_path(value, expected):
(["flake8", "pep8", "pyflakes", "mccabe"],
["flake8", "pep8", "pyflakes", "mccabe"]),
(["../flake8", "../pep8", "../pyflakes", "../mccabe"],
[os.path.abspath("../" + p) for p in RELATIVE_PATHS]),
[os.path.abspath(f"../{p}") for p in RELATIVE_PATHS]),
])
def test_normalize_paths(value, expected):
"""Verify we normalizes a sequence of paths provided to the tool."""
@ -242,7 +242,7 @@ def test_filenames_from_exclude_doesnt_exclude_directory_names(tmpdir):
def test_parameters_for_class_plugin():
"""Verify that we can retrieve the parameters for a class plugin."""
class FakeCheck(object):
class FakeCheck:
def __init__(self, tree):
raise NotImplementedError
@ -268,7 +268,7 @@ def test_parameters_for_function_plugin():
def read_diff_file(filename):
"""Read the diff file in its entirety."""
with open(filename, 'r') as fd:
with open(filename) as fd:
content = fd.read()
return content
@ -308,7 +308,6 @@ def test_matches_filename_for_excluding_dotfiles():
assert not utils.matches_filename('..', ('.*',), '', logger)
@pytest.mark.xfail(sys.version_info < (3,), reason='py3+ only behaviour')
def test_stdin_get_value_crlf():
"""Ensure that stdin is normalized from crlf to lf."""
stdin = io.TextIOWrapper(io.BytesIO(b'1\r\n2\r\n'), 'UTF-8')

View file

@ -1,5 +1,6 @@
"""Tests for the flake8.style_guide.Violation class."""
import mock
from unittest import mock
import pytest
from flake8 import style_guide

23
tox.ini
View file

@ -1,10 +1,9 @@
[tox]
minversion=2.3.1
envlist = py27,py35,py36,py37,py38,flake8,linters,docs
envlist = py36,py37,py38,flake8,linters,docs
[testenv]
deps =
mock>=2.0.0
pytest!=3.0.5,!=5.2.3
coverage
commands =
@ -14,14 +13,8 @@ commands =
# ensure 100% coverage of tests
coverage report --fail-under 100 --include tests/*
[testenv:venv]
deps =
.
commands = {posargs}
# Dogfood our current master version
[testenv:dogfood]
basepython = python3
skip_install = true
deps =
wheel
@ -33,20 +26,17 @@ commands =
# Linters
[testenv:flake8]
basepython = python3
skip_install = true
deps =
flake8
flake8-bugbear
flake8-docstrings>=1.3.1
flake8-import-order>=0.9
flake8-typing-imports>=1.1
pep8-naming
commands =
flake8 src/flake8/ tests/ setup.py
[testenv:pylint]
basepython = python3
skip_install = true
deps =
pyflakes
@ -55,7 +45,6 @@ commands =
pylint src/flake8
[testenv:doc8]
basepython = python3
skip_install = true
deps =
sphinx
@ -64,14 +53,12 @@ commands =
doc8 docs/source/
[testenv:pre-commit]
basepython = python3
skip_install = true
deps = pre-commit
commands =
pre-commit run --all-files --show-diff-on-failure
[testenv:bandit]
basepython = python3
skip_install = true
deps =
bandit
@ -79,7 +66,6 @@ commands =
bandit -r src/flake8/ -c .bandit.yml
[testenv:linters]
basepython = python3
skip_install = true
deps =
{[testenv:flake8]deps}
@ -96,7 +82,6 @@ commands =
# Documentation
[testenv:docs]
basepython = python3
deps =
-rdocs/source/requirements.txt
commands =
@ -104,7 +89,6 @@ commands =
sphinx-build -E -W -c docs/source/ -b man docs/source/ docs/build/man
[testenv:serve-docs]
basepython = python3
skip_install = true
changedir = docs/build/html
deps =
@ -112,7 +96,6 @@ commands =
python -m http.server {posargs}
[testenv:readme]
basepython = python3
deps =
readme_renderer
commands =
@ -120,7 +103,6 @@ commands =
# Release tooling
[testenv:build]
basepython = python3
skip_install = true
deps =
wheel
@ -129,7 +111,6 @@ commands =
python setup.py -q sdist bdist_wheel
[testenv:release]
basepython = python3
skip_install = true
deps =
{[testenv:build]deps}
@ -159,5 +140,3 @@ exclude =
.cache,
.eggs
max-complexity = 10
import-order-style = google
application-import-names = flake8