This commit is contained in:
Anthony Sottile 2021-03-29 19:48:44 -07:00
parent 64a610ed19
commit cb36e206a5
19 changed files with 361 additions and 389 deletions

View file

@ -62,7 +62,7 @@ def configure_logging(verbosity, filename=None, logformat=LOG_FORMAT):
if not filename or filename in ("stderr", "stdout"): if not filename or filename in ("stderr", "stdout"):
fileobj = getattr(sys, filename or "stderr") fileobj = getattr(sys, filename or "stderr")
handler_cls = logging.StreamHandler # type: Type[logging.Handler] handler_cls: Type[logging.Handler] = logging.StreamHandler
else: else:
fileobj = filename fileobj = filename
handler_cls = logging.FileHandler handler_cls = logging.FileHandler

View file

@ -81,7 +81,7 @@ class StyleGuide:
self._file_checker_manager = application.file_checker_manager self._file_checker_manager = application.file_checker_manager
@property @property
def options(self): # type: () -> argparse.Namespace def options(self) -> argparse.Namespace:
"""Return application's options. """Return application's options.
An instance of :class:`argparse.Namespace` containing parsed options. An instance of :class:`argparse.Namespace` containing parsed options.

View file

@ -85,8 +85,8 @@ class Manager:
self.options = style_guide.options self.options = style_guide.options
self.checks = checker_plugins self.checks = checker_plugins
self.jobs = self._job_count() self.jobs = self._job_count()
self._all_checkers = [] # type: List[FileChecker] self._all_checkers: List[FileChecker] = []
self.checkers = [] # type: List[FileChecker] self.checkers: List[FileChecker] = []
self.statistics = { self.statistics = {
"files": 0, "files": 0,
"logical lines": 0, "logical lines": 0,
@ -103,8 +103,7 @@ class Manager:
self.statistics[statistic] += checker.statistics[statistic] self.statistics[statistic] += checker.statistics[statistic]
self.statistics["files"] += len(self.checkers) self.statistics["files"] += len(self.checkers)
def _job_count(self): def _job_count(self) -> int:
# type: () -> int
# First we walk through all of our error cases: # First we walk through all of our error cases:
# - multiprocessing library is not present # - multiprocessing library is not present
# - we're running on windows in which case we know we have significant # - we're running on windows in which case we know we have significant
@ -165,8 +164,7 @@ class Manager:
) )
return reported_results_count return reported_results_count
def is_path_excluded(self, path): def is_path_excluded(self, path: str) -> bool:
# type: (str) -> bool
"""Check if a path is excluded. """Check if a path is excluded.
:param str path: :param str path:
@ -189,8 +187,7 @@ class Manager:
logger=LOG, logger=LOG,
) )
def make_checkers(self, paths=None): def make_checkers(self, paths: Optional[List[str]] = None) -> None:
# type: (Optional[List[str]]) -> None
"""Create checkers for each file.""" """Create checkers for each file."""
if paths is None: if paths is None:
paths = self.arguments paths = self.arguments
@ -235,8 +232,7 @@ class Manager:
self.checkers = [c for c in self._all_checkers if c.should_process] self.checkers = [c for c in self._all_checkers if c.should_process]
LOG.info("Checking %d files", len(self.checkers)) LOG.info("Checking %d files", len(self.checkers))
def report(self): def report(self) -> Tuple[int, int]:
# type: () -> Tuple[int, int]
"""Report all of the errors found in the managed file checkers. """Report all of the errors found in the managed file checkers.
This iterates over each of the checkers and reports the errors sorted This iterates over each of the checkers and reports the errors sorted
@ -258,11 +254,11 @@ class Manager:
results_found += len(results) results_found += len(results)
return (results_found, results_reported) return (results_found, results_reported)
def run_parallel(self): # type: () -> None def run_parallel(self) -> None:
"""Run the checkers in parallel.""" """Run the checkers in parallel."""
# fmt: off # fmt: off
final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501 final_results: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] = collections.defaultdict(list) # noqa: E501
final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501 final_statistics: Dict[str, Dict[str, int]] = collections.defaultdict(dict) # noqa: E501
# fmt: on # fmt: on
pool = _try_initialize_processpool(self.jobs) pool = _try_initialize_processpool(self.jobs)
@ -297,12 +293,12 @@ class Manager:
checker.results = final_results[filename] checker.results = final_results[filename]
checker.statistics = final_statistics[filename] checker.statistics = final_statistics[filename]
def run_serial(self): # type: () -> None def run_serial(self) -> None:
"""Run the checkers in serial.""" """Run the checkers in serial."""
for checker in self.checkers: for checker in self.checkers:
checker.run_checks() checker.run_checks()
def run(self): # type: () -> None def run(self) -> None:
"""Run all the checkers. """Run all the checkers.
This will intelligently decide whether to run the checks in parallel This will intelligently decide whether to run the checks in parallel
@ -356,9 +352,7 @@ class FileChecker:
self.options = options self.options = options
self.filename = filename self.filename = filename
self.checks = checks self.checks = checks
# fmt: off self.results: List[Tuple[str, int, int, str, Optional[str]]] = []
self.results = [] # type: List[Tuple[str, int, int, str, Optional[str]]] # noqa: E501
# fmt: on
self.statistics = { self.statistics = {
"tokens": 0, "tokens": 0,
"logical lines": 0, "logical lines": 0,
@ -372,12 +366,11 @@ class FileChecker:
self.should_process = not self.processor.should_ignore_file() self.should_process = not self.processor.should_ignore_file()
self.statistics["physical lines"] = len(self.processor.lines) self.statistics["physical lines"] = len(self.processor.lines)
def __repr__(self): # type: () -> str def __repr__(self) -> str:
"""Provide helpful debugging representation.""" """Provide helpful debugging representation."""
return f"FileChecker for {self.filename}" return f"FileChecker for {self.filename}"
def _make_processor(self): def _make_processor(self) -> Optional[processor.FileProcessor]:
# type: () -> Optional[processor.FileProcessor]
try: try:
return processor.FileProcessor(self.filename, self.options) return processor.FileProcessor(self.filename, self.options)
except OSError as e: except OSError as e:
@ -391,8 +384,13 @@ class FileChecker:
self.report("E902", 0, 0, message) self.report("E902", 0, 0, message)
return None return None
def report(self, error_code, line_number, column, text): def report(
# type: (Optional[str], int, int, str) -> str self,
error_code: Optional[str],
line_number: int,
column: int,
text: str,
) -> str:
"""Report an error by storing it in the results list.""" """Report an error by storing it in the results list."""
if error_code is None: if error_code is None:
error_code, text = text.split(" ", 1) error_code, text = text.split(" ", 1)
@ -468,7 +466,7 @@ class FileChecker:
column -= column_offset column -= column_offset
return row, column return row, column
def run_ast_checks(self): # type: () -> None def run_ast_checks(self) -> None:
"""Run all checks expecting an abstract syntax tree.""" """Run all checks expecting an abstract syntax tree."""
try: try:
ast = self.processor.build_ast() ast = self.processor.build_ast()
@ -610,8 +608,9 @@ class FileChecker:
else: else:
self.run_logical_checks() self.run_logical_checks()
def check_physical_eol(self, token, prev_physical): def check_physical_eol(
# type: (processor._Token, str) -> None self, token: processor._Token, prev_physical: str
) -> None:
"""Run physical checks if and only if it is at the end of the line.""" """Run physical checks if and only if it is at the end of the line."""
# a newline token ends a single physical line. # a newline token ends a single physical line.
if processor.is_eol_token(token): if processor.is_eol_token(token):
@ -640,13 +639,14 @@ class FileChecker:
self.run_physical_checks(line + "\n") self.run_physical_checks(line + "\n")
def _pool_init(): # type: () -> None def _pool_init() -> None:
"""Ensure correct signaling of ^C using multiprocessing.Pool.""" """Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
def _try_initialize_processpool(job_count): def _try_initialize_processpool(
# type: (int) -> Optional[multiprocessing.pool.Pool] job_count: int,
) -> Optional[multiprocessing.pool.Pool]:
"""Return a new process pool instance if we are able to create one.""" """Return a new process pool instance if we are able to create one."""
try: try:
return multiprocessing.Pool(job_count, _pool_init) return multiprocessing.Pool(job_count, _pool_init)
@ -675,8 +675,9 @@ def _run_checks(checker):
return checker.run_checks() return checker.run_checks()
def find_offset(offset, mapping): def find_offset(
# type: (int, processor._LogicalMapping) -> Tuple[int, int] offset: int, mapping: processor._LogicalMapping
) -> Tuple[int, int]:
"""Find the offset tuple for a single offset.""" """Find the offset tuple for a single offset."""
if isinstance(offset, tuple): if isinstance(offset, tuple):
return offset return offset

View file

@ -19,14 +19,13 @@ class FailedToLoadPlugin(Flake8Exception):
FORMAT = 'Flake8 failed to load plugin "%(name)s" due to %(exc)s.' FORMAT = 'Flake8 failed to load plugin "%(name)s" due to %(exc)s.'
def __init__(self, plugin_name, exception): def __init__(self, plugin_name: str, exception: Exception) -> None:
# type: (str, Exception) -> None
"""Initialize our FailedToLoadPlugin exception.""" """Initialize our FailedToLoadPlugin exception."""
self.plugin_name = plugin_name self.plugin_name = plugin_name
self.original_exception = exception self.original_exception = exception
super().__init__(plugin_name, exception) super().__init__(plugin_name, exception)
def __str__(self): # type: () -> str def __str__(self) -> str:
"""Format our exception message.""" """Format our exception message."""
return self.FORMAT % { return self.FORMAT % {
"name": self.plugin_name, "name": self.plugin_name,
@ -37,7 +36,7 @@ class FailedToLoadPlugin(Flake8Exception):
class InvalidSyntax(Flake8Exception): class InvalidSyntax(Flake8Exception):
"""Exception raised when tokenizing a file fails.""" """Exception raised when tokenizing a file fails."""
def __init__(self, exception): # type: (Exception) -> None def __init__(self, exception: Exception) -> None:
"""Initialize our InvalidSyntax exception.""" """Initialize our InvalidSyntax exception."""
self.original_exception = exception self.original_exception = exception
self.error_message = "{}: {}".format( self.error_message = "{}: {}".format(
@ -48,7 +47,7 @@ class InvalidSyntax(Flake8Exception):
self.column_number = 0 self.column_number = 0
super().__init__(exception) super().__init__(exception)
def __str__(self): # type: () -> str def __str__(self) -> str:
"""Format our exception message.""" """Format our exception message."""
return self.error_message return self.error_message
@ -58,14 +57,13 @@ class PluginRequestedUnknownParameters(Flake8Exception):
FORMAT = '"%(name)s" requested unknown parameters causing %(exc)s' FORMAT = '"%(name)s" requested unknown parameters causing %(exc)s'
def __init__(self, plugin, exception): def __init__(self, plugin: Dict[str, str], exception: Exception) -> None:
# type: (Dict[str, str], Exception) -> None
"""Pop certain keyword arguments for initialization.""" """Pop certain keyword arguments for initialization."""
self.plugin = plugin self.plugin = plugin
self.original_exception = exception self.original_exception = exception
super().__init__(plugin, exception) super().__init__(plugin, exception)
def __str__(self): # type: () -> str def __str__(self) -> str:
"""Format our exception message.""" """Format our exception message."""
return self.FORMAT % { return self.FORMAT % {
"name": self.plugin["plugin_name"], "name": self.plugin["plugin_name"],
@ -78,14 +76,13 @@ class PluginExecutionFailed(Flake8Exception):
FORMAT = '"%(name)s" failed during execution due to "%(exc)s"' FORMAT = '"%(name)s" failed during execution due to "%(exc)s"'
def __init__(self, plugin, exception): def __init__(self, plugin: Dict[str, str], exception: Exception) -> None:
# type: (Dict[str, str], Exception) -> None
"""Utilize keyword arguments for message generation.""" """Utilize keyword arguments for message generation."""
self.plugin = plugin self.plugin = plugin
self.original_exception = exception self.original_exception = exception
super().__init__(plugin, exception) super().__init__(plugin, exception)
def __str__(self): # type: () -> str def __str__(self) -> str:
"""Format our exception message.""" """Format our exception message."""
return self.FORMAT % { return self.FORMAT % {
"name": self.plugin["plugin_name"], "name": self.plugin["plugin_name"],

View file

@ -33,8 +33,7 @@ class BaseFormatter:
output filename has been specified. output filename has been specified.
""" """
def __init__(self, options): def __init__(self, options: argparse.Namespace) -> None:
# type: (argparse.Namespace) -> None
"""Initialize with the options parsed from config and cli. """Initialize with the options parsed from config and cli.
This also calls a hook, :meth:`after_init`, so subclasses do not need This also calls a hook, :meth:`after_init`, so subclasses do not need
@ -48,14 +47,14 @@ class BaseFormatter:
""" """
self.options = options self.options = options
self.filename = options.output_file self.filename = options.output_file
self.output_fd = None # type: Optional[IO[str]] self.output_fd: Optional[IO[str]] = None
self.newline = "\n" self.newline = "\n"
self.after_init() self.after_init()
def after_init(self): # type: () -> None def after_init(self) -> None:
"""Initialize the formatter further.""" """Initialize the formatter further."""
def beginning(self, filename): # type: (str) -> None def beginning(self, filename: str) -> None:
"""Notify the formatter that we're starting to process a file. """Notify the formatter that we're starting to process a file.
:param str filename: :param str filename:
@ -63,7 +62,7 @@ class BaseFormatter:
from. from.
""" """
def finished(self, filename): # type: (str) -> None def finished(self, filename: str) -> None:
"""Notify the formatter that we've finished processing a file. """Notify the formatter that we've finished processing a file.
:param str filename: :param str filename:
@ -71,7 +70,7 @@ class BaseFormatter:
from. from.
""" """
def start(self): # type: () -> None def start(self) -> None:
"""Prepare the formatter to receive input. """Prepare the formatter to receive input.
This defaults to initializing :attr:`output_fd` if :attr:`filename` This defaults to initializing :attr:`output_fd` if :attr:`filename`
@ -79,7 +78,7 @@ class BaseFormatter:
if self.filename: if self.filename:
self.output_fd = open(self.filename, "a") self.output_fd = open(self.filename, "a")
def handle(self, error): # type: (Violation) -> None def handle(self, error: "Violation") -> None:
"""Handle an error reported by Flake8. """Handle an error reported by Flake8.
This defaults to calling :meth:`format`, :meth:`show_source`, and This defaults to calling :meth:`format`, :meth:`show_source`, and
@ -96,7 +95,7 @@ class BaseFormatter:
source = self.show_source(error) source = self.show_source(error)
self.write(line, source) self.write(line, source)
def format(self, error): # type: (Violation) -> Optional[str] def format(self, error: "Violation") -> Optional[str]:
"""Format an error reported by Flake8. """Format an error reported by Flake8.
This method **must** be implemented by subclasses. This method **must** be implemented by subclasses.
@ -115,7 +114,7 @@ class BaseFormatter:
"Subclass of BaseFormatter did not implement" " format." "Subclass of BaseFormatter did not implement" " format."
) )
def show_statistics(self, statistics): # type: (Statistics) -> None def show_statistics(self, statistics: "Statistics") -> None:
"""Format and print the statistics.""" """Format and print the statistics."""
for error_code in statistics.error_codes(): for error_code in statistics.error_codes():
stats_for_error_code = statistics.statistics_for(error_code) stats_for_error_code = statistics.statistics_for(error_code)
@ -130,8 +129,7 @@ class BaseFormatter:
) )
) )
def show_benchmarks(self, benchmarks): def show_benchmarks(self, benchmarks: List[Tuple[str, float]]) -> None:
# type: (List[Tuple[str, float]]) -> None
"""Format and print the benchmarks.""" """Format and print the benchmarks."""
# NOTE(sigmavirus24): The format strings are a little confusing, even # NOTE(sigmavirus24): The format strings are a little confusing, even
# to me, so here's a quick explanation: # to me, so here's a quick explanation:
@ -152,7 +150,7 @@ class BaseFormatter:
benchmark = float_format(statistic=statistic, value=value) benchmark = float_format(statistic=statistic, value=value)
self._write(benchmark) self._write(benchmark)
def show_source(self, error): # type: (Violation) -> Optional[str] def show_source(self, error: "Violation") -> Optional[str]:
"""Show the physical line generating the error. """Show the physical line generating the error.
This also adds an indicator for the particular part of the line that This also adds an indicator for the particular part of the line that
@ -183,15 +181,14 @@ class BaseFormatter:
# one # one
return f"{error.physical_line}{indent}^" return f"{error.physical_line}{indent}^"
def _write(self, output): # type: (str) -> None def _write(self, output: str) -> None:
"""Handle logic of whether to use an output file or print().""" """Handle logic of whether to use an output file or print()."""
if self.output_fd is not None: if self.output_fd is not None:
self.output_fd.write(output + self.newline) self.output_fd.write(output + self.newline)
if self.output_fd is None or self.options.tee: if self.output_fd is None or self.options.tee:
print(output, end=self.newline) print(output, end=self.newline)
def write(self, line, source): def write(self, line: Optional[str], source: Optional[str]) -> None:
# type: (Optional[str], Optional[str]) -> None
"""Write the line either to the output file or stdout. """Write the line either to the output file or stdout.
This handles deciding whether to write to a file or print to standard This handles deciding whether to write to a file or print to standard
@ -209,7 +206,7 @@ class BaseFormatter:
if source: if source:
self._write(source) self._write(source)
def stop(self): # type: () -> None def stop(self) -> None:
"""Clean up after reporting is finished.""" """Clean up after reporting is finished."""
if self.output_fd is not None: if self.output_fd is not None:
self.output_fd.close() self.output_fd.close()

View file

@ -25,9 +25,9 @@ class SimpleFormatter(base.BaseFormatter):
""" """
error_format = None # type: str error_format: str
def format(self, error): # type: (Violation) -> Optional[str] def format(self, error: "Violation") -> Optional[str]:
"""Format and write error out. """Format and write error out.
If an output filename is specified, write formatted errors to that If an output filename is specified, write formatted errors to that
@ -51,7 +51,7 @@ class Default(SimpleFormatter):
error_format = "%(path)s:%(row)d:%(col)d: %(code)s %(text)s" error_format = "%(path)s:%(row)d:%(col)d: %(code)s %(text)s"
def after_init(self): # type: () -> None def after_init(self) -> None:
"""Check for a custom format string.""" """Check for a custom format string."""
if self.options.format.lower() != "default": if self.options.format.lower() != "default":
self.error_format = self.options.format self.error_format = self.options.format
@ -68,14 +68,14 @@ class FilenameOnly(SimpleFormatter):
error_format = "%(path)s" error_format = "%(path)s"
def after_init(self): # type: () -> None def after_init(self) -> None:
"""Initialize our set of filenames.""" """Initialize our set of filenames."""
self.filenames_already_printed = set() # type: Set[str] self.filenames_already_printed: Set[str] = set()
def show_source(self, error): # type: (Violation) -> Optional[str] def show_source(self, error: "Violation") -> Optional[str]:
"""Do not include the source code.""" """Do not include the source code."""
def format(self, error): # type: (Violation) -> Optional[str] def format(self, error: "Violation") -> Optional[str]:
"""Ensure we only print each error once.""" """Ensure we only print each error once."""
if error.filename not in self.filenames_already_printed: if error.filename not in self.filenames_already_printed:
self.filenames_already_printed.add(error.filename) self.filenames_already_printed.add(error.filename)
@ -87,8 +87,8 @@ class FilenameOnly(SimpleFormatter):
class Nothing(base.BaseFormatter): class Nothing(base.BaseFormatter):
"""Print absolutely nothing.""" """Print absolutely nothing."""
def format(self, error): # type: (Violation) -> Optional[str] def format(self, error: "Violation") -> Optional[str]:
"""Do nothing.""" """Do nothing."""
def show_source(self, error): # type: (Violation) -> Optional[str] def show_source(self, error: "Violation") -> Optional[str]:
"""Do not print the source.""" """Do not print the source."""

View file

@ -44,7 +44,7 @@ class Application:
#: The timestamp when the Application instance was instantiated. #: The timestamp when the Application instance was instantiated.
self.start_time = time.time() self.start_time = time.time()
#: The timestamp when the Application finished reported errors. #: The timestamp when the Application finished reported errors.
self.end_time = None # type: float self.end_time: float = None
#: The name of the program being run #: The name of the program being run
self.program = program self.program = program
#: The version of the program being run #: The version of the program being run
@ -63,26 +63,24 @@ class Application:
options.register_default_options(self.option_manager) options.register_default_options(self.option_manager)
#: The instance of :class:`flake8.plugins.manager.Checkers` #: The instance of :class:`flake8.plugins.manager.Checkers`
self.check_plugins = None # type: plugin_manager.Checkers self.check_plugins: plugin_manager.Checkers = None
# fmt: off
#: The instance of :class:`flake8.plugins.manager.ReportFormatters` #: The instance of :class:`flake8.plugins.manager.ReportFormatters`
self.formatting_plugins = None # type: plugin_manager.ReportFormatters self.formatting_plugins: plugin_manager.ReportFormatters = None
# fmt: on
#: The user-selected formatter from :attr:`formatting_plugins` #: The user-selected formatter from :attr:`formatting_plugins`
self.formatter = None # type: BaseFormatter self.formatter: BaseFormatter = None
#: The :class:`flake8.style_guide.StyleGuideManager` built from the #: The :class:`flake8.style_guide.StyleGuideManager` built from the
#: user's options #: user's options
self.guide = None # type: style_guide.StyleGuideManager self.guide: style_guide.StyleGuideManager = None
#: The :class:`flake8.checker.Manager` that will handle running all of #: The :class:`flake8.checker.Manager` that will handle running all of
#: the checks selected by the user. #: the checks selected by the user.
self.file_checker_manager = None # type: checker.Manager self.file_checker_manager: checker.Manager = None
#: The user-supplied options parsed into an instance of #: The user-supplied options parsed into an instance of
#: :class:`argparse.Namespace` #: :class:`argparse.Namespace`
self.options = None # type: argparse.Namespace self.options: argparse.Namespace = None
#: The left over arguments that were not parsed by #: The left over arguments that were not parsed by
#: :attr:`option_manager` #: :attr:`option_manager`
self.args = None # type: List[str] self.args: List[str] = None
#: The number of errors, warnings, and other messages after running #: The number of errors, warnings, and other messages after running
#: flake8 and taking into account ignored errors and lines. #: flake8 and taking into account ignored errors and lines.
self.result_count = 0 self.result_count = 0
@ -96,10 +94,11 @@ class Application:
#: Whether the program is processing a diff or not #: Whether the program is processing a diff or not
self.running_against_diff = False self.running_against_diff = False
#: The parsed diff information #: The parsed diff information
self.parsed_diff = {} # type: Dict[str, Set[int]] self.parsed_diff: Dict[str, Set[int]] = {}
def parse_preliminary_options(self, argv): def parse_preliminary_options(
# type: (List[str]) -> Tuple[argparse.Namespace, List[str]] self, argv: List[str]
) -> Tuple[argparse.Namespace, List[str]]:
"""Get preliminary options from the CLI, pre-plugin-loading. """Get preliminary options from the CLI, pre-plugin-loading.
We need to know the values of a few standard options so that we can We need to know the values of a few standard options so that we can
@ -123,8 +122,7 @@ class Application:
rest.extend(("--output-file", args.output_file)) rest.extend(("--output-file", args.output_file))
return args, rest return args, rest
def exit(self): def exit(self) -> None:
# type: () -> None
"""Handle finalization and exiting the program. """Handle finalization and exiting the program.
This should be the last thing called on the application instance. It This should be the last thing called on the application instance. It
@ -140,8 +138,7 @@ class Application:
(self.result_count > 0) or self.catastrophic_failure (self.result_count > 0) or self.catastrophic_failure
) )
def find_plugins(self, config_finder): def find_plugins(self, config_finder: config.ConfigFileFinder) -> None:
# type: (config.ConfigFileFinder) -> None
"""Find and load the plugins for this application. """Find and load the plugins for this application.
Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes
@ -163,8 +160,7 @@ class Application:
self.check_plugins.load_plugins() self.check_plugins.load_plugins()
self.formatting_plugins.load_plugins() self.formatting_plugins.load_plugins()
def register_plugin_options(self): def register_plugin_options(self) -> None:
# type: () -> None
"""Register options provided by plugins to our option manager.""" """Register options provided by plugins to our option manager."""
self.check_plugins.register_options(self.option_manager) self.check_plugins.register_options(self.option_manager)
self.check_plugins.register_plugin_versions(self.option_manager) self.check_plugins.register_plugin_versions(self.option_manager)
@ -172,10 +168,9 @@ class Application:
def parse_configuration_and_cli( def parse_configuration_and_cli(
self, self,
config_finder, # type: config.ConfigFileFinder config_finder: config.ConfigFileFinder,
argv, # type: List[str] argv: List[str],
): ) -> None:
# type: (...) -> None
"""Parse configuration files and the CLI options. """Parse configuration files and the CLI options.
:param config.ConfigFileFinder config_finder: :param config.ConfigFileFinder config_finder:
@ -215,8 +210,9 @@ class Application:
return formatter_plugin.execute return formatter_plugin.execute
def make_formatter(self, formatter_class=None): def make_formatter(
# type: (Optional[Type[BaseFormatter]]) -> None self, formatter_class: Optional[Type["BaseFormatter"]] = None
) -> None:
"""Initialize a formatter based on the parsed options.""" """Initialize a formatter based on the parsed options."""
format_plugin = self.options.format format_plugin = self.options.format
if 1 <= self.options.quiet < 2: if 1 <= self.options.quiet < 2:
@ -229,8 +225,7 @@ class Application:
self.formatter = formatter_class(self.options) self.formatter = formatter_class(self.options)
def make_guide(self): def make_guide(self) -> None:
# type: () -> None
"""Initialize our StyleGuide.""" """Initialize our StyleGuide."""
self.guide = style_guide.StyleGuideManager( self.guide = style_guide.StyleGuideManager(
self.options, self.formatter self.options, self.formatter
@ -239,8 +234,7 @@ class Application:
if self.running_against_diff: if self.running_against_diff:
self.guide.add_diff_ranges(self.parsed_diff) self.guide.add_diff_ranges(self.parsed_diff)
def make_file_checker_manager(self): def make_file_checker_manager(self) -> None:
# type: () -> None
"""Initialize our FileChecker Manager.""" """Initialize our FileChecker Manager."""
self.file_checker_manager = checker.Manager( self.file_checker_manager = checker.Manager(
style_guide=self.guide, style_guide=self.guide,
@ -248,8 +242,7 @@ class Application:
checker_plugins=self.check_plugins, checker_plugins=self.check_plugins,
) )
def run_checks(self, files=None): def run_checks(self, files: Optional[List[str]] = None) -> None:
# type: (Optional[List[str]]) -> None
"""Run the actual checks with the FileChecker Manager. """Run the actual checks with the FileChecker Manager.
This method encapsulates the logic to make a This method encapsulates the logic to make a
@ -289,8 +282,7 @@ class Application:
self.formatter.show_benchmarks(statistics) self.formatter.show_benchmarks(statistics)
def report_errors(self): def report_errors(self) -> None:
# type: () -> None
"""Report all the errors found by flake8 3.0. """Report all the errors found by flake8 3.0.
This also updates the :attr:`result_count` attribute with the total This also updates the :attr:`result_count` attribute with the total
@ -312,8 +304,7 @@ class Application:
self.formatter.show_statistics(self.guide.stats) self.formatter.show_statistics(self.guide.stats)
def initialize(self, argv): def initialize(self, argv: List[str]) -> None:
# type: (List[str]) -> None
"""Initialize the application to be run. """Initialize the application to be run.
This finds the plugins, registers their options, and parses the This finds the plugins, registers their options, and parses the
@ -347,14 +338,12 @@ class Application:
self.report_benchmarks() self.report_benchmarks()
self.formatter.stop() self.formatter.stop()
def _run(self, argv): def _run(self, argv: List[str]) -> None:
# type: (List[str]) -> None
self.initialize(argv) self.initialize(argv)
self.run_checks() self.run_checks()
self.report() self.report()
def run(self, argv): def run(self, argv: List[str]) -> None:
# type: (List[str]) -> None
"""Run our application. """Run our application.
This method will also handle KeyboardInterrupt exceptions for the This method will also handle KeyboardInterrupt exceptions for the

View file

@ -6,8 +6,7 @@ from typing import Optional
from flake8.main import application from flake8.main import application
def main(argv=None): def main(argv: Optional[List[str]] = None) -> None:
# type: (Optional[List[str]]) -> None
"""Execute the main bit of the application. """Execute the main bit of the application.
This handles the creation of an instance of :class:`Application`, runs it, This handles the creation of an instance of :class:`Application`, runs it,

View file

@ -59,6 +59,6 @@ def plugins_from(option_manager):
] ]
def dependencies(): # type: () -> List[Dict[str, str]] def dependencies() -> List[Dict[str, str]]:
"""Generate the list of dependencies we care about.""" """Generate the list of dependencies we care about."""
return [] return []

View file

@ -6,8 +6,7 @@ from flake8 import defaults
from flake8.main import debug from flake8.main import debug
def register_preliminary_options(parser): def register_preliminary_options(parser: argparse.ArgumentParser) -> None:
# type: (argparse.ArgumentParser) -> None
"""Register the preliminary options on our OptionManager. """Register the preliminary options on our OptionManager.
The preliminary options include: The preliminary options include:
@ -64,7 +63,7 @@ def register_preliminary_options(parser):
class JobsArgument: class JobsArgument:
"""Type callback for the --jobs argument.""" """Type callback for the --jobs argument."""
def __init__(self, arg): # type: (str) -> None def __init__(self, arg: str) -> None:
"""Parse and validate the --jobs argument. """Parse and validate the --jobs argument.
:param str arg: :param str arg:

View file

@ -15,10 +15,10 @@ LOG = logging.getLogger(__name__)
def aggregate_options( def aggregate_options(
manager, # type: OptionManager manager: OptionManager,
config_finder, # type: config.ConfigFileFinder config_finder: config.ConfigFileFinder,
argv, # type: List[str] argv: List[str],
): # type: (...) -> Tuple[argparse.Namespace, List[str]] ) -> Tuple[argparse.Namespace, List[str]]:
"""Aggregate and merge CLI and config file options. """Aggregate and merge CLI and config file options.
:param flake8.options.manager.OptionManager manager: :param flake8.options.manager.OptionManager manager:

View file

@ -19,12 +19,11 @@ class ConfigFileFinder:
def __init__( def __init__(
self, self,
program_name, program_name: str,
extra_config_files=None, extra_config_files: Optional[List[str]] = None,
config_file=None, config_file: Optional[str] = None,
ignore_config_files=False, ignore_config_files: bool = False,
): ) -> None:
# type: (str, Optional[List[str]], Optional[str], bool) -> None
"""Initialize object to find config files. """Initialize object to find config files.
:param str program_name: :param str program_name:
@ -57,8 +56,7 @@ class ConfigFileFinder:
self.local_directory = os.path.abspath(os.curdir) self.local_directory = os.path.abspath(os.curdir)
@staticmethod @staticmethod
def _user_config_file(program_name): def _user_config_file(program_name: str) -> str:
# type: (str) -> str
if utils.is_windows(): if utils.is_windows():
home_dir = os.path.expanduser("~") home_dir = os.path.expanduser("~")
config_file_basename = "." + program_name config_file_basename = "." + program_name
@ -71,8 +69,9 @@ class ConfigFileFinder:
return os.path.join(home_dir, config_file_basename) return os.path.join(home_dir, config_file_basename)
@staticmethod @staticmethod
def _read_config(*files): def _read_config(
# type: (*str) -> Tuple[configparser.RawConfigParser, List[str]] *files: str,
) -> Tuple[configparser.RawConfigParser, List[str]]:
config = configparser.RawConfigParser() config = configparser.RawConfigParser()
found_files = [] found_files = []
@ -93,8 +92,7 @@ class ConfigFileFinder:
) )
return (config, found_files) return (config, found_files)
def cli_config(self, files): def cli_config(self, files: str) -> configparser.RawConfigParser:
# type: (str) -> configparser.RawConfigParser
"""Read and parse the config file specified on the command-line.""" """Read and parse the config file specified on the command-line."""
config, found_files = self._read_config(files) config, found_files = self._read_config(files)
if found_files: if found_files:
@ -360,7 +358,7 @@ def get_local_plugins(config_finder):
raw_paths = utils.parse_comma_separated_list( raw_paths = utils.parse_comma_separated_list(
config.get(section, "paths").strip() config.get(section, "paths").strip()
) )
norm_paths = [] # type: List[str] norm_paths: List[str] = []
for base_dir in base_dirs: for base_dir in base_dirs:
norm_paths.extend( norm_paths.extend(
path path

View file

@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__)
_ARG = enum.Enum("_ARG", "NO") _ARG = enum.Enum("_ARG", "NO")
_optparse_callable_map = { _optparse_callable_map: Dict[str, Union[Type[Any], _ARG]] = {
"int": int, "int": int,
"long": int, "long": int,
"string": str, "string": str,
@ -41,14 +41,13 @@ _optparse_callable_map = {
"choice": _ARG.NO, "choice": _ARG.NO,
# optparse allows this but does not document it # optparse allows this but does not document it
"str": str, "str": str,
} # type: Dict[str, Union[Type[Any], _ARG]] }
class _CallbackAction(argparse.Action): class _CallbackAction(argparse.Action):
"""Shim for optparse-style callback actions.""" """Shim for optparse-style callback actions."""
def __init__(self, *args, **kwargs): def __init__(self, *args: Any, **kwargs: Any) -> None:
# type: (*Any, **Any) -> None
self._callback = kwargs.pop("callback") self._callback = kwargs.pop("callback")
self._callback_args = kwargs.pop("callback_args", ()) self._callback_args = kwargs.pop("callback_args", ())
self._callback_kwargs = kwargs.pop("callback_kwargs", {}) self._callback_kwargs = kwargs.pop("callback_kwargs", {})
@ -56,12 +55,11 @@ class _CallbackAction(argparse.Action):
def __call__( def __call__(
self, self,
parser, # type: argparse.ArgumentParser parser: argparse.ArgumentParser,
namespace, # type: argparse.Namespace namespace: argparse.Namespace,
values, # type: Optional[Union[Sequence[str], str]] values: Optional[Union[Sequence[str], str]],
option_string=None, # type: Optional[str] option_string: Optional[str] = None,
): ) -> None:
# type: (...) -> None
if not values: if not values:
values = None values = None
elif isinstance(values, list) and len(values) > 1: elif isinstance(values, list) and len(values) > 1:
@ -76,14 +74,15 @@ class _CallbackAction(argparse.Action):
) )
def _flake8_normalize(value, *args, **kwargs): def _flake8_normalize(
# type: (str, *str, **bool) -> Union[str, List[str]] value: str, *args: str, **kwargs: bool
) -> Union[str, List[str]]:
comma_separated_list = kwargs.pop("comma_separated_list", False) comma_separated_list = kwargs.pop("comma_separated_list", False)
normalize_paths = kwargs.pop("normalize_paths", False) normalize_paths = kwargs.pop("normalize_paths", False)
if kwargs: if kwargs:
raise TypeError(f"Unexpected keyword args: {kwargs}") raise TypeError(f"Unexpected keyword args: {kwargs}")
ret = value # type: Union[str, List[str]] ret: Union[str, List[str]] = value
if comma_separated_list and isinstance(ret, str): if comma_separated_list and isinstance(ret, str):
ret = utils.parse_comma_separated_list(value) ret = utils.parse_comma_separated_list(value)
@ -101,29 +100,29 @@ class Option:
def __init__( def __init__(
self, self,
short_option_name=_ARG.NO, # type: Union[str, _ARG] short_option_name: Union[str, _ARG] = _ARG.NO,
long_option_name=_ARG.NO, # type: Union[str, _ARG] long_option_name: Union[str, _ARG] = _ARG.NO,
# Options below here are taken from the optparse.Option class # Options below here are taken from the optparse.Option class
action=_ARG.NO, # type: Union[str, Type[argparse.Action], _ARG] action: Union[str, Type[argparse.Action], _ARG] = _ARG.NO,
default=_ARG.NO, # type: Union[Any, _ARG] default: Union[Any, _ARG] = _ARG.NO,
type=_ARG.NO, # type: Union[str, Callable[..., Any], _ARG] type: Union[str, Callable[..., Any], _ARG] = _ARG.NO,
dest=_ARG.NO, # type: Union[str, _ARG] dest: Union[str, _ARG] = _ARG.NO,
nargs=_ARG.NO, # type: Union[int, str, _ARG] nargs: Union[int, str, _ARG] = _ARG.NO,
const=_ARG.NO, # type: Union[Any, _ARG] const: Union[Any, _ARG] = _ARG.NO,
choices=_ARG.NO, # type: Union[Sequence[Any], _ARG] choices: Union[Sequence[Any], _ARG] = _ARG.NO,
help=_ARG.NO, # type: Union[str, _ARG] help: Union[str, _ARG] = _ARG.NO,
metavar=_ARG.NO, # type: Union[str, _ARG] metavar: Union[str, _ARG] = _ARG.NO,
# deprecated optparse-only options # deprecated optparse-only options
callback=_ARG.NO, # type: Union[Callable[..., Any], _ARG] callback: Union[Callable[..., Any], _ARG] = _ARG.NO,
callback_args=_ARG.NO, # type: Union[Sequence[Any], _ARG] callback_args: Union[Sequence[Any], _ARG] = _ARG.NO,
callback_kwargs=_ARG.NO, # type: Union[Mapping[str, Any], _ARG] callback_kwargs: Union[Mapping[str, Any], _ARG] = _ARG.NO,
# Options below are taken from argparse.ArgumentParser.add_argument # Options below are taken from argparse.ArgumentParser.add_argument
required=_ARG.NO, # type: Union[bool, _ARG] required: Union[bool, _ARG] = _ARG.NO,
# Options below here are specific to Flake8 # Options below here are specific to Flake8
parse_from_config=False, # type: bool parse_from_config: bool = False,
comma_separated_list=False, # type: bool comma_separated_list: bool = False,
normalize_paths=False, # type: bool normalize_paths: bool = False,
): # type: (...) -> None ) -> None:
"""Initialize an Option instance. """Initialize an Option instance.
The following are all passed directly through to argparse. The following are all passed directly through to argparse.
@ -251,7 +250,7 @@ class Option:
self.help = help self.help = help
self.metavar = metavar self.metavar = metavar
self.required = required self.required = required
self.option_kwargs = { self.option_kwargs: Dict[str, Union[Any, _ARG]] = {
"action": self.action, "action": self.action,
"default": self.default, "default": self.default,
"type": self.type, "type": self.type,
@ -265,14 +264,14 @@ class Option:
"help": self.help, "help": self.help,
"metavar": self.metavar, "metavar": self.metavar,
"required": self.required, "required": self.required,
} # type: Dict[str, Union[Any, _ARG]] }
# Set our custom attributes # Set our custom attributes
self.parse_from_config = parse_from_config self.parse_from_config = parse_from_config
self.comma_separated_list = comma_separated_list self.comma_separated_list = comma_separated_list
self.normalize_paths = normalize_paths self.normalize_paths = normalize_paths
self.config_name = None # type: Optional[str] self.config_name: Optional[str] = None
if parse_from_config: if parse_from_config:
if long_option_name is _ARG.NO: if long_option_name is _ARG.NO:
raise ValueError( raise ValueError(
@ -284,13 +283,13 @@ class Option:
self._opt = None self._opt = None
@property @property
def filtered_option_kwargs(self): # type: () -> Dict[str, Any] def filtered_option_kwargs(self) -> Dict[str, Any]:
"""Return any actually-specified arguments.""" """Return any actually-specified arguments."""
return { return {
k: v for k, v in self.option_kwargs.items() if v is not _ARG.NO k: v for k, v in self.option_kwargs.items() if v is not _ARG.NO
} }
def __repr__(self): # type: () -> str # noqa: D105 def __repr__(self) -> str: # noqa: D105
parts = [] parts = []
for arg in self.option_args: for arg in self.option_args:
parts.append(arg) parts.append(arg)
@ -298,8 +297,7 @@ class Option:
parts.append(f"{k}={v!r}") parts.append(f"{k}={v!r}")
return "Option({})".format(", ".join(parts)) return "Option({})".format(", ".join(parts))
def normalize(self, value, *normalize_args): def normalize(self, value: Any, *normalize_args: str) -> Any:
# type: (Any, *str) -> Any
"""Normalize the value based on the option configuration.""" """Normalize the value based on the option configuration."""
if self.comma_separated_list and isinstance(value, str): if self.comma_separated_list and isinstance(value, str):
value = utils.parse_comma_separated_list(value) value = utils.parse_comma_separated_list(value)
@ -312,8 +310,9 @@ class Option:
return value return value
def normalize_from_setuptools(self, value): def normalize_from_setuptools(
# type: (str) -> Union[int, float, complex, bool, str] self, value: str
) -> Union[int, float, complex, bool, str]:
"""Normalize the value received from setuptools.""" """Normalize the value received from setuptools."""
value = self.normalize(value) value = self.normalize(value)
if self.type is int or self.action == "count": if self.type is int or self.action == "count":
@ -330,13 +329,12 @@ class Option:
return False return False
return value return value
def to_argparse(self): def to_argparse(self) -> Tuple[List[str], Dict[str, Any]]:
# type: () -> Tuple[List[str], Dict[str, Any]]
"""Convert a Flake8 Option to argparse ``add_argument`` arguments.""" """Convert a Flake8 Option to argparse ``add_argument`` arguments."""
return self.option_args, self.filtered_option_kwargs return self.option_args, self.filtered_option_kwargs
@property @property
def to_optparse(self): # type: () -> NoReturn def to_optparse(self) -> "NoReturn":
"""No longer functional.""" """No longer functional."""
raise AttributeError("to_optparse: flake8 now uses argparse") raise AttributeError("to_optparse: flake8 now uses argparse")
@ -351,11 +349,11 @@ class OptionManager:
def __init__( def __init__(
self, self,
prog, prog: str,
version, version: str,
usage="%(prog)s [options] file file ...", usage: str = "%(prog)s [options] file file ...",
parents=None, parents: Optional[List[argparse.ArgumentParser]] = None,
): # type: (str, str, str, Optional[List[argparse.ArgumentParser]]) -> None # noqa: E501 ) -> None: # noqa: E501
"""Initialize an instance of an OptionManager. """Initialize an instance of an OptionManager.
:param str prog: :param str prog:
@ -371,10 +369,10 @@ class OptionManager:
if parents is None: if parents is None:
parents = [] parents = []
self.parser = argparse.ArgumentParser( self.parser: argparse.ArgumentParser = argparse.ArgumentParser(
prog=prog, usage=usage, parents=parents prog=prog, usage=usage, parents=parents
) # type: argparse.ArgumentParser )
self._current_group = None # type: Optional[argparse._ArgumentGroup] self._current_group: Optional[argparse._ArgumentGroup] = None
self.version_action = cast( self.version_action = cast(
"argparse._VersionAction", "argparse._VersionAction",
self.parser.add_argument( self.parser.add_argument(
@ -382,16 +380,16 @@ class OptionManager:
), ),
) )
self.parser.add_argument("filenames", nargs="*", metavar="filename") self.parser.add_argument("filenames", nargs="*", metavar="filename")
self.config_options_dict = {} # type: Dict[str, Option] self.config_options_dict: Dict[str, Option] = {}
self.options = [] # type: List[Option] self.options: List[Option] = []
self.program_name = prog self.program_name = prog
self.version = version self.version = version
self.registered_plugins = set() # type: Set[PluginVersion] self.registered_plugins: Set[PluginVersion] = set()
self.extended_default_ignore = set() # type: Set[str] self.extended_default_ignore: Set[str] = set()
self.extended_default_select = set() # type: Set[str] self.extended_default_select: Set[str] = set()
@contextlib.contextmanager @contextlib.contextmanager
def group(self, name): # type: (str) -> Generator[None, None, None] def group(self, name: str) -> Generator[None, None, None]:
"""Attach options to an argparse group during this context.""" """Attach options to an argparse group during this context."""
group = self.parser.add_argument_group(name) group = self.parser.add_argument_group(name)
self._current_group, orig_group = group, self._current_group self._current_group, orig_group = group, self._current_group
@ -400,7 +398,7 @@ class OptionManager:
finally: finally:
self._current_group = orig_group self._current_group = orig_group
def add_option(self, *args, **kwargs): # type: (*Any, **Any) -> None def add_option(self, *args: Any, **kwargs: Any) -> None:
"""Create and register a new option. """Create and register a new option.
See parameters for :class:`~flake8.options.manager.Option` for See parameters for :class:`~flake8.options.manager.Option` for
@ -425,8 +423,7 @@ class OptionManager:
self.config_options_dict[name.replace("_", "-")] = option self.config_options_dict[name.replace("_", "-")] = option
LOG.debug('Registered option "%s".', option) LOG.debug('Registered option "%s".', option)
def remove_from_default_ignore(self, error_codes): def remove_from_default_ignore(self, error_codes: Sequence[str]) -> None:
# type: (Sequence[str]) -> None
"""Remove specified error codes from the default ignore list. """Remove specified error codes from the default ignore list.
:param list error_codes: :param list error_codes:
@ -444,8 +441,7 @@ class OptionManager:
error_code, error_code,
) )
def extend_default_ignore(self, error_codes): def extend_default_ignore(self, error_codes: Sequence[str]) -> None:
# type: (Sequence[str]) -> None
"""Extend the default ignore list with the error codes provided. """Extend the default ignore list with the error codes provided.
:param list error_codes: :param list error_codes:
@ -455,8 +451,7 @@ class OptionManager:
LOG.debug("Extending default ignore list with %r", error_codes) LOG.debug("Extending default ignore list with %r", error_codes)
self.extended_default_ignore.update(error_codes) self.extended_default_ignore.update(error_codes)
def extend_default_select(self, error_codes): def extend_default_select(self, error_codes: Sequence[str]) -> None:
# type: (Sequence[str]) -> None
"""Extend the default select list with the error codes provided. """Extend the default select list with the error codes provided.
:param list error_codes: :param list error_codes:
@ -467,22 +462,21 @@ class OptionManager:
self.extended_default_select.update(error_codes) self.extended_default_select.update(error_codes)
def generate_versions( def generate_versions(
self, format_str="%(name)s: %(version)s", join_on=", " self, format_str: str = "%(name)s: %(version)s", join_on: str = ", "
): ) -> str:
# type: (str, str) -> str
"""Generate a comma-separated list of versions of plugins.""" """Generate a comma-separated list of versions of plugins."""
return join_on.join( return join_on.join(
format_str % plugin._asdict() format_str % plugin._asdict()
for plugin in sorted(self.registered_plugins) for plugin in sorted(self.registered_plugins)
) )
def update_version_string(self): # type: () -> None def update_version_string(self) -> None:
"""Update the flake8 version string.""" """Update the flake8 version string."""
self.version_action.version = "{} ({}) {}".format( self.version_action.version = "{} ({}) {}".format(
self.version, self.generate_versions(), utils.get_python_version() self.version, self.generate_versions(), utils.get_python_version()
) )
def generate_epilog(self): # type: () -> None def generate_epilog(self) -> None:
"""Create an epilog with the version and name of each of plugin.""" """Create an epilog with the version and name of each of plugin."""
plugin_version_format = "%(name)s: %(version)s" plugin_version_format = "%(name)s: %(version)s"
self.parser.epilog = "Installed plugins: " + self.generate_versions( self.parser.epilog = "Installed plugins: " + self.generate_versions(
@ -491,10 +485,9 @@ class OptionManager:
def parse_args( def parse_args(
self, self,
args=None, # type: Optional[List[str]] args: Optional[List[str]] = None,
values=None, # type: Optional[argparse.Namespace] values: Optional[argparse.Namespace] = None,
): ) -> Tuple[argparse.Namespace, List[str]]:
# type: (...) -> Tuple[argparse.Namespace, List[str]]
"""Proxy to calling the OptionParser's parse_args method.""" """Proxy to calling the OptionParser's parse_args method."""
self.generate_epilog() self.generate_epilog()
self.update_version_string() self.update_version_string()
@ -504,8 +497,9 @@ class OptionManager:
# TODO: refactor callers to not need this # TODO: refactor callers to not need this
return parsed_args, parsed_args.filenames return parsed_args, parsed_args.filenames
def parse_known_args(self, args=None): def parse_known_args(
# type: (Optional[List[str]]) -> Tuple[argparse.Namespace, List[str]] self, args: Optional[List[str]] = None
) -> Tuple[argparse.Namespace, List[str]]:
"""Parse only the known arguments from the argument values. """Parse only the known arguments from the argument values.
Replicate a little argparse behaviour while we're still on Replicate a little argparse behaviour while we're still on
@ -515,8 +509,9 @@ class OptionManager:
self.update_version_string() self.update_version_string()
return self.parser.parse_known_args(args) return self.parser.parse_known_args(args)
def register_plugin(self, name, version, local=False): def register_plugin(
# type: (str, str, bool) -> None self, name: str, version: str, local: bool = False
) -> None:
"""Register a plugin relying on the OptionManager. """Register a plugin relying on the OptionManager.
:param str name: :param str name:

View file

@ -35,14 +35,14 @@ class Plugin:
self.name = name self.name = name
self.entry_point = entry_point self.entry_point = entry_point
self.local = local self.local = local
self._plugin = None # type: Any self._plugin: Any = None
self._parameters = None self._parameters = None
self._parameter_names = None # type: Optional[List[str]] self._parameter_names: Optional[List[str]] = None
self._group = None self._group = None
self._plugin_name = None self._plugin_name = None
self._version = None self._version = None
def __repr__(self): # type: () -> str def __repr__(self) -> str:
"""Provide an easy to read description of the current plugin.""" """Provide an easy to read description of the current plugin."""
return 'Plugin(name="{}", entry_point="{}")'.format( return 'Plugin(name="{}", entry_point="{}")'.format(
self.name, self.entry_point.value self.name, self.entry_point.value
@ -88,7 +88,7 @@ class Plugin:
return self._parameters return self._parameters
@property @property
def parameter_names(self): # type: () -> List[str] def parameter_names(self) -> List[str]:
"""List of argument names that need to be passed to the plugin.""" """List of argument names that need to be passed to the plugin."""
if self._parameter_names is None: if self._parameter_names is None:
self._parameter_names = list(self.parameters) self._parameter_names = list(self.parameters)
@ -104,7 +104,7 @@ class Plugin:
return self._plugin return self._plugin
@property @property
def version(self): # type: () -> str def version(self) -> str:
"""Return the version of the plugin.""" """Return the version of the plugin."""
version = self._version version = self._version
if version is None: if version is None:
@ -226,8 +226,9 @@ class Plugin:
class PluginManager: # pylint: disable=too-few-public-methods class PluginManager: # pylint: disable=too-few-public-methods
"""Find and manage plugins consistently.""" """Find and manage plugins consistently."""
def __init__(self, namespace, local_plugins=None): def __init__(
# type: (str, Optional[List[str]]) -> None self, namespace: str, local_plugins: Optional[List[str]] = None
) -> None:
"""Initialize the manager. """Initialize the manager.
:param str namespace: :param str namespace:
@ -236,8 +237,8 @@ class PluginManager: # pylint: disable=too-few-public-methods
Plugins from config (as "X = path.to:Plugin" strings). Plugins from config (as "X = path.to:Plugin" strings).
""" """
self.namespace = namespace self.namespace = namespace
self.plugins = {} # type: Dict[str, Plugin] self.plugins: Dict[str, Plugin] = {}
self.names = [] # type: List[str] self.names: List[str] = []
self._load_local_plugins(local_plugins or []) self._load_local_plugins(local_plugins or [])
self._load_entrypoint_plugins() self._load_entrypoint_plugins()
@ -314,7 +315,7 @@ class PluginManager: # pylint: disable=too-few-public-methods
:rtype: :rtype:
tuple tuple
""" """
plugins_seen = set() # type: Set[str] plugins_seen: Set[str] = set()
for entry_point_name in self.names: for entry_point_name in self.names:
plugin = self.plugins[entry_point_name] plugin = self.plugins[entry_point_name]
plugin_name = plugin.plugin_name plugin_name = plugin.plugin_name
@ -349,7 +350,7 @@ def version_for(plugin):
class PluginTypeManager: class PluginTypeManager:
"""Parent class for most of the specific plugin types.""" """Parent class for most of the specific plugin types."""
namespace = None # type: str namespace: str
def __init__(self, local_plugins=None): def __init__(self, local_plugins=None):
"""Initialize the plugin type's manager. """Initialize the plugin type's manager.

View file

@ -1,9 +1,7 @@
"""Plugin built-in to Flake8 to treat pyflakes as a plugin.""" """Plugin built-in to Flake8 to treat pyflakes as a plugin."""
import os import os
from typing import List from typing import List
import pyflakes
import pyflakes.checker import pyflakes.checker
from flake8 import utils from flake8 import utils
@ -66,8 +64,8 @@ class FlakesChecker(pyflakes.checker.Checker):
name = "pyflakes" name = "pyflakes"
version = pyflakes.__version__ version = pyflakes.__version__
with_doctest = False with_doctest = False
include_in_doctest = [] # type: List[str] include_in_doctest: List[str] = []
exclude_from_doctest = [] # type: List[str] exclude_from_doctest: List[str] = []
def __init__(self, tree, file_tokens, filename): def __init__(self, tree, file_tokens, filename):
"""Initialize the PyFlakes plugin with an AST tree and filename.""" """Initialize the PyFlakes plugin with an AST tree and filename."""

View file

@ -60,8 +60,12 @@ class FileProcessor:
#: always ``False``, included for compatibility #: always ``False``, included for compatibility
noqa = False noqa = False
def __init__(self, filename, options, lines=None): def __init__(
# type: (str, argparse.Namespace, Optional[List[str]]) -> None self,
filename: str,
options: argparse.Namespace,
lines: Optional[List[str]] = None,
) -> None:
"""Initialice our file processor. """Initialice our file processor.
:param str filename: :param str filename:
@ -78,13 +82,13 @@ class FileProcessor:
#: Number of blank lines #: Number of blank lines
self.blank_lines = 0 self.blank_lines = 0
#: Checker states for each plugin? #: Checker states for each plugin?
self._checker_states = {} # type: Dict[str, Dict[Any, Any]] self._checker_states: Dict[str, Dict[Any, Any]] = {}
#: Current checker state #: Current checker state
self.checker_state = {} # type: Dict[Any, Any] self.checker_state: Dict[Any, Any] = {}
#: User provided option for hang closing #: User provided option for hang closing
self.hang_closing = options.hang_closing self.hang_closing = options.hang_closing
#: Character used for indentation #: Character used for indentation
self.indent_char = None # type: Optional[str] self.indent_char: Optional[str] = None
#: Current level of indentation #: Current level of indentation
self.indent_level = 0 self.indent_level = 0
#: Number of spaces used for indentation #: Number of spaces used for indentation
@ -108,19 +112,19 @@ class FileProcessor:
#: Previous unindented (i.e. top-level) logical line #: Previous unindented (i.e. top-level) logical line
self.previous_unindented_logical_line = "" self.previous_unindented_logical_line = ""
#: Current set of tokens #: Current set of tokens
self.tokens = [] # type: List[_Token] self.tokens: List[_Token] = []
#: Total number of lines in the file #: Total number of lines in the file
self.total_lines = len(self.lines) self.total_lines = len(self.lines)
#: Verbosity level of Flake8 #: Verbosity level of Flake8
self.verbose = options.verbose self.verbose = options.verbose
#: Statistics dictionary #: Statistics dictionary
self.statistics = {"logical lines": 0} self.statistics = {"logical lines": 0}
self._file_tokens = None # type: Optional[List[_Token]] self._file_tokens: Optional[List[_Token]] = None
# map from line number to the line we'll search for `noqa` in # map from line number to the line we'll search for `noqa` in
self._noqa_line_mapping = None # type: Optional[Dict[int, str]] self._noqa_line_mapping: Optional[Dict[int, str]] = None
@property @property
def file_tokens(self): # type: () -> List[_Token] def file_tokens(self) -> List[_Token]:
"""Return the complete set of tokens for a file. """Return the complete set of tokens for a file.
Accessing this attribute *may* raise an InvalidSyntax exception. Accessing this attribute *may* raise an InvalidSyntax exception.
@ -139,28 +143,28 @@ class FileProcessor:
return self._file_tokens return self._file_tokens
@contextlib.contextmanager @contextlib.contextmanager
def inside_multiline(self, line_number): def inside_multiline(
# type: (int) -> Generator[None, None, None] self, line_number: int
) -> Generator[None, None, None]:
"""Context-manager to toggle the multiline attribute.""" """Context-manager to toggle the multiline attribute."""
self.line_number = line_number self.line_number = line_number
self.multiline = True self.multiline = True
yield yield
self.multiline = False self.multiline = False
def reset_blank_before(self): # type: () -> None def reset_blank_before(self) -> None:
"""Reset the blank_before attribute to zero.""" """Reset the blank_before attribute to zero."""
self.blank_before = 0 self.blank_before = 0
def delete_first_token(self): # type: () -> None def delete_first_token(self) -> None:
"""Delete the first token in the list of tokens.""" """Delete the first token in the list of tokens."""
del self.tokens[0] del self.tokens[0]
def visited_new_blank_line(self): # type: () -> None def visited_new_blank_line(self) -> None:
"""Note that we visited a new blank line.""" """Note that we visited a new blank line."""
self.blank_lines += 1 self.blank_lines += 1
def update_state(self, mapping): def update_state(self, mapping: _LogicalMapping) -> None:
# type: (_LogicalMapping) -> None
"""Update the indent level based on the logical line mapping.""" """Update the indent level based on the logical line mapping."""
(start_row, start_col) = mapping[0][1] (start_row, start_col) = mapping[0][1]
start_line = self.lines[start_row - 1] start_line = self.lines[start_row - 1]
@ -168,15 +172,14 @@ class FileProcessor:
if self.blank_before < self.blank_lines: if self.blank_before < self.blank_lines:
self.blank_before = self.blank_lines self.blank_before = self.blank_lines
def update_checker_state_for(self, plugin): def update_checker_state_for(self, plugin: Dict[str, Any]) -> None:
# type: (Dict[str, Any]) -> None
"""Update the checker_state attribute for the plugin.""" """Update the checker_state attribute for the plugin."""
if "checker_state" in plugin["parameters"]: if "checker_state" in plugin["parameters"]:
self.checker_state = self._checker_states.setdefault( self.checker_state = self._checker_states.setdefault(
plugin["name"], {} plugin["name"], {}
) )
def next_logical_line(self): # type: () -> None def next_logical_line(self) -> None:
"""Record the previous logical line. """Record the previous logical line.
This also resets the tokens list and the blank_lines count. This also resets the tokens list and the blank_lines count.
@ -189,11 +192,11 @@ class FileProcessor:
self.blank_lines = 0 self.blank_lines = 0
self.tokens = [] self.tokens = []
def build_logical_line_tokens(self): # type: () -> _Logical def build_logical_line_tokens(self) -> _Logical:
"""Build the mapping, comments, and logical line lists.""" """Build the mapping, comments, and logical line lists."""
logical = [] logical = []
comments = [] comments = []
mapping = [] # type: _LogicalMapping mapping: _LogicalMapping = []
length = 0 length = 0
previous_row = previous_column = None previous_row = previous_column = None
for token_type, text, start, end, line in self.tokens: for token_type, text, start, end, line in self.tokens:
@ -224,12 +227,11 @@ class FileProcessor:
(previous_row, previous_column) = end (previous_row, previous_column) = end
return comments, logical, mapping return comments, logical, mapping
def build_ast(self): # type: () -> ast.AST def build_ast(self) -> ast.AST:
"""Build an abstract syntax tree from the list of lines.""" """Build an abstract syntax tree from the list of lines."""
return ast.parse("".join(self.lines)) return ast.parse("".join(self.lines))
def build_logical_line(self): def build_logical_line(self) -> Tuple[str, str, _LogicalMapping]:
# type: () -> Tuple[str, str, _LogicalMapping]
"""Build a logical line from the current tokens list.""" """Build a logical line from the current tokens list."""
comments, logical, mapping_list = self.build_logical_line_tokens() comments, logical, mapping_list = self.build_logical_line_tokens()
joined_comments = "".join(comments) joined_comments = "".join(comments)
@ -237,8 +239,7 @@ class FileProcessor:
self.statistics["logical lines"] += 1 self.statistics["logical lines"] += 1
return joined_comments, self.logical_line, mapping_list return joined_comments, self.logical_line, mapping_list
def split_line(self, token): def split_line(self, token: _Token) -> Generator[str, None, None]:
# type: (_Token) -> Generator[str, None, None]
"""Split a physical line's line based on new-lines. """Split a physical line's line based on new-lines.
This also auto-increments the line number for the caller. This also auto-increments the line number for the caller.
@ -247,8 +248,11 @@ class FileProcessor:
yield line yield line
self.line_number += 1 self.line_number += 1
def keyword_arguments_for(self, parameters, arguments=None): def keyword_arguments_for(
# type: (Dict[str, bool], Optional[Dict[str, Any]]) -> Dict[str, Any] self,
parameters: Dict[str, bool],
arguments: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Generate the keyword arguments for a list of parameters.""" """Generate the keyword arguments for a list of parameters."""
if arguments is None: if arguments is None:
arguments = {} arguments = {}
@ -269,7 +273,7 @@ class FileProcessor:
) )
return arguments return arguments
def generate_tokens(self): # type: () -> Generator[_Token, None, None] def generate_tokens(self) -> Generator[_Token, None, None]:
"""Tokenize the file and yield the tokens. """Tokenize the file and yield the tokens.
:raises flake8.exceptions.InvalidSyntax: :raises flake8.exceptions.InvalidSyntax:
@ -285,13 +289,14 @@ class FileProcessor:
except (tokenize.TokenError, SyntaxError) as exc: except (tokenize.TokenError, SyntaxError) as exc:
raise exceptions.InvalidSyntax(exception=exc) raise exceptions.InvalidSyntax(exception=exc)
def _noqa_line_range(self, min_line, max_line): def _noqa_line_range(
# type: (int, int) -> Dict[int, str] self, min_line: int, max_line: int
) -> Dict[int, str]:
line_range = range(min_line, max_line + 1) line_range = range(min_line, max_line + 1)
joined = "".join(self.lines[min_line - 1 : max_line]) joined = "".join(self.lines[min_line - 1 : max_line])
return dict.fromkeys(line_range, joined) return dict.fromkeys(line_range, joined)
def noqa_line_for(self, line_number): # type: (int) -> Optional[str] def noqa_line_for(self, line_number: int) -> Optional[str]:
"""Retrieve the line which will be used to determine noqa.""" """Retrieve the line which will be used to determine noqa."""
if self._noqa_line_mapping is None: if self._noqa_line_mapping is None:
try: try:
@ -331,7 +336,7 @@ class FileProcessor:
# retrieve a physical line (since none exist). # retrieve a physical line (since none exist).
return self._noqa_line_mapping.get(line_number) return self._noqa_line_mapping.get(line_number)
def next_line(self): # type: () -> str def next_line(self) -> str:
"""Get the next line from the list.""" """Get the next line from the list."""
if self.line_number >= self.total_lines: if self.line_number >= self.total_lines:
return "" return ""
@ -341,8 +346,7 @@ class FileProcessor:
self.indent_char = line[0] self.indent_char = line[0]
return line return line
def read_lines(self): def read_lines(self) -> List[str]:
# type: () -> List[str]
"""Read the lines for this file checker.""" """Read the lines for this file checker."""
if self.filename is None or self.filename == "-": if self.filename is None or self.filename == "-":
self.filename = self.options.stdin_display_name or "stdin" self.filename = self.options.stdin_display_name or "stdin"
@ -351,8 +355,7 @@ class FileProcessor:
lines = self.read_lines_from_filename() lines = self.read_lines_from_filename()
return lines return lines
def read_lines_from_filename(self): def read_lines_from_filename(self) -> List[str]:
# type: () -> List[str]
"""Read the lines for a file.""" """Read the lines for a file."""
try: try:
with tokenize.open(self.filename) as fd: with tokenize.open(self.filename) as fd:
@ -363,13 +366,11 @@ class FileProcessor:
with open(self.filename, encoding="latin-1") as fd: with open(self.filename, encoding="latin-1") as fd:
return fd.readlines() return fd.readlines()
def read_lines_from_stdin(self): def read_lines_from_stdin(self) -> List[str]:
# type: () -> List[str]
"""Read the lines from standard in.""" """Read the lines from standard in."""
return utils.stdin_get_lines() return utils.stdin_get_lines()
def should_ignore_file(self): def should_ignore_file(self) -> bool:
# type: () -> bool
"""Check if ``flake8: noqa`` is in the file to be ignored. """Check if ``flake8: noqa`` is in the file to be ignored.
:returns: :returns:
@ -391,8 +392,7 @@ class FileProcessor:
else: else:
return False return False
def strip_utf_bom(self): def strip_utf_bom(self) -> None:
# type: () -> None
"""Strip the UTF bom from the lines of the file.""" """Strip the UTF bom from the lines of the file."""
if not self.lines: if not self.lines:
# If we have nothing to analyze quit early # If we have nothing to analyze quit early
@ -409,23 +409,22 @@ class FileProcessor:
self.lines[0] = self.lines[0][3:] self.lines[0] = self.lines[0][3:]
def is_eol_token(token): # type: (_Token) -> bool def is_eol_token(token: _Token) -> bool:
"""Check if the token is an end-of-line token.""" """Check if the token is an end-of-line token."""
return token[0] in NEWLINE or token[4][token[3][1] :].lstrip() == "\\\n" return token[0] in NEWLINE or token[4][token[3][1] :].lstrip() == "\\\n"
def is_multiline_string(token): # type: (_Token) -> bool def is_multiline_string(token: _Token) -> bool:
"""Check if this is a multiline string.""" """Check if this is a multiline string."""
return token[0] == tokenize.STRING and "\n" in token[1] return token[0] == tokenize.STRING and "\n" in token[1]
def token_is_newline(token): # type: (_Token) -> bool def token_is_newline(token: _Token) -> bool:
"""Check if the token type is a newline token type.""" """Check if the token type is a newline token type."""
return token[0] in NEWLINE return token[0] in NEWLINE
def count_parentheses(current_parentheses_count, token_text): def count_parentheses(current_parentheses_count: int, token_text: str) -> int:
# type: (int, str) -> int
"""Count the number of parentheses.""" """Count the number of parentheses."""
if token_text in "([{": # nosec if token_text in "([{": # nosec
return current_parentheses_count + 1 return current_parentheses_count + 1
@ -434,7 +433,7 @@ def count_parentheses(current_parentheses_count, token_text):
return current_parentheses_count return current_parentheses_count
def log_token(log, token): # type: (logging.Logger, _Token) -> None def log_token(log: logging.Logger, token: _Token) -> None:
"""Log a token to a provided logging object.""" """Log a token to a provided logging object."""
if token[2][0] == token[3][0]: if token[2][0] == token[3][0]:
pos = "[{}:{}]".format(token[2][1] or "", token[3][1]) pos = "[{}:{}]".format(token[2][1] or "", token[3][1])
@ -449,7 +448,7 @@ def log_token(log, token): # type: (logging.Logger, _Token) -> None
# NOTE(sigmavirus24): This was taken wholesale from # NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle # https://github.com/PyCQA/pycodestyle
def expand_indent(line): # type: (str) -> int def expand_indent(line: str) -> int:
r"""Return the amount of indentation. r"""Return the amount of indentation.
Tabs are expanded to the next multiple of 8. Tabs are expanded to the next multiple of 8.
@ -479,7 +478,7 @@ def expand_indent(line): # type: (str) -> int
# NOTE(sigmavirus24): This was taken wholesale from # NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be # https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be
# more descriptive. # more descriptive.
def mutate_string(text): # type: (str) -> str def mutate_string(text: str) -> str:
"""Replace contents with 'xxx' to prevent syntax matching. """Replace contents with 'xxx' to prevent syntax matching.
>>> mutate_string('"abc"') >>> mutate_string('"abc"')

View file

@ -13,11 +13,11 @@ if TYPE_CHECKING:
class Statistics: class Statistics:
"""Manager of aggregated statistics for a run of Flake8.""" """Manager of aggregated statistics for a run of Flake8."""
def __init__(self): # type: () -> None def __init__(self) -> None:
"""Initialize the underlying dictionary for our statistics.""" """Initialize the underlying dictionary for our statistics."""
self._store = {} # type: Dict[Key, Statistic] self._store: Dict[Key, "Statistic"] = {}
def error_codes(self): # type: () -> List[str] def error_codes(self) -> List[str]:
"""Return all unique error codes stored. """Return all unique error codes stored.
:returns: :returns:
@ -27,7 +27,7 @@ class Statistics:
""" """
return sorted({key.code for key in self._store}) return sorted({key.code for key in self._store})
def record(self, error): # type: (Violation) -> None def record(self, error: "Violation") -> None:
"""Add the fact that the error was seen in the file. """Add the fact that the error was seen in the file.
:param error: :param error:
@ -41,8 +41,9 @@ class Statistics:
self._store[key] = Statistic.create_from(error) self._store[key] = Statistic.create_from(error)
self._store[key].increment() self._store[key].increment()
def statistics_for(self, prefix, filename=None): def statistics_for(
# type: (str, Optional[str]) -> Generator[Statistic, None, None] self, prefix: str, filename: Optional[str] = None
) -> Generator["Statistic", None, None]:
"""Generate statistics for the prefix and filename. """Generate statistics for the prefix and filename.
If you have a :class:`Statistics` object that has recorded errors, If you have a :class:`Statistics` object that has recorded errors,
@ -83,11 +84,11 @@ class Key(collections.namedtuple("Key", ["filename", "code"])):
__slots__ = () __slots__ = ()
@classmethod @classmethod
def create_from(cls, error): # type: (Violation) -> Key def create_from(cls, error: "Violation") -> "Key":
"""Create a Key from :class:`flake8.style_guide.Violation`.""" """Create a Key from :class:`flake8.style_guide.Violation`."""
return cls(filename=error.filename, code=error.code) return cls(filename=error.filename, code=error.code)
def matches(self, prefix, filename): # type: (str, Optional[str]) -> bool def matches(self, prefix: str, filename: Optional[str]) -> bool:
"""Determine if this key matches some constraints. """Determine if this key matches some constraints.
:param str prefix: :param str prefix:
@ -114,8 +115,9 @@ class Statistic:
convenience methods on it. convenience methods on it.
""" """
def __init__(self, error_code, filename, message, count): def __init__(
# type: (str, str, str, int) -> None self, error_code: str, filename: str, message: str, count: int
) -> None:
"""Initialize our Statistic.""" """Initialize our Statistic."""
self.error_code = error_code self.error_code = error_code
self.filename = filename self.filename = filename
@ -123,7 +125,7 @@ class Statistic:
self.count = count self.count = count
@classmethod @classmethod
def create_from(cls, error): # type: (Violation) -> Statistic def create_from(cls, error: "Violation") -> "Statistic":
"""Create a Statistic from a :class:`flake8.style_guide.Violation`.""" """Create a Statistic from a :class:`flake8.style_guide.Violation`."""
return cls( return cls(
error_code=error.code, error_code=error.code,
@ -132,6 +134,6 @@ class Statistic:
count=0, count=0,
) )
def increment(self): # type: () -> None def increment(self) -> None:
"""Increment the number of times we've seen this error in this file.""" """Increment the number of times we've seen this error in this file."""
self.count += 1 self.count += 1

View file

@ -50,7 +50,7 @@ class Decision(enum.Enum):
@functools.lru_cache(maxsize=512) @functools.lru_cache(maxsize=512)
def find_noqa(physical_line): # type: (str) -> Optional[Match[str]] def find_noqa(physical_line: str) -> Optional[Match[str]]:
return defaults.NOQA_INLINE_REGEXP.search(physical_line) return defaults.NOQA_INLINE_REGEXP.search(physical_line)
@ -70,8 +70,7 @@ _Violation = collections.namedtuple(
class Violation(_Violation): class Violation(_Violation):
"""Class representing a violation reported by Flake8.""" """Class representing a violation reported by Flake8."""
def is_inline_ignored(self, disable_noqa): def is_inline_ignored(self, disable_noqa: bool) -> bool:
# type: (bool) -> bool
"""Determine if a comment has been added to ignore this line. """Determine if a comment has been added to ignore this line.
:param bool disable_noqa: :param bool disable_noqa:
@ -112,8 +111,7 @@ class Violation(_Violation):
) )
return False return False
def is_in(self, diff): def is_in(self, diff: Dict[str, Set[int]]) -> bool:
# type: (Dict[str, Set[int]]) -> bool
"""Determine if the violation is included in a diff's line ranges. """Determine if the violation is included in a diff's line ranges.
This function relies on the parsed data added via This function relies on the parsed data added via
@ -156,9 +154,9 @@ class DecisionEngine:
ignored. ignored.
""" """
def __init__(self, options): # type: (argparse.Namespace) -> None def __init__(self, options: argparse.Namespace) -> None:
"""Initialize the engine.""" """Initialize the engine."""
self.cache = {} # type: Dict[str, Decision] self.cache: Dict[str, Decision] = {}
self.selected = tuple(options.select) self.selected = tuple(options.select)
self.extended_selected = tuple( self.extended_selected = tuple(
sorted(options.extended_default_select, reverse=True) sorted(options.extended_default_select, reverse=True)
@ -176,16 +174,15 @@ class DecisionEngine:
self.using_default_ignore = set(self.ignored) == set(defaults.IGNORE) self.using_default_ignore = set(self.ignored) == set(defaults.IGNORE)
self.using_default_select = set(self.selected) == set(defaults.SELECT) self.using_default_select = set(self.selected) == set(defaults.SELECT)
def _in_all_selected(self, code): # type: (str) -> bool def _in_all_selected(self, code: str) -> bool:
return bool(self.all_selected) and code.startswith(self.all_selected) return bool(self.all_selected) and code.startswith(self.all_selected)
def _in_extended_selected(self, code): # type: (str) -> bool def _in_extended_selected(self, code: str) -> bool:
return bool(self.extended_selected) and code.startswith( return bool(self.extended_selected) and code.startswith(
self.extended_selected self.extended_selected
) )
def was_selected(self, code): def was_selected(self, code: str) -> Union[Selected, Ignored]:
# type: (str) -> Union[Selected, Ignored]
"""Determine if the code has been selected by the user. """Determine if the code has been selected by the user.
:param str code: :param str code:
@ -208,8 +205,7 @@ class DecisionEngine:
return Ignored.Implicitly return Ignored.Implicitly
def was_ignored(self, code): def was_ignored(self, code: str) -> Union[Selected, Ignored]:
# type: (str) -> Union[Selected, Ignored]
"""Determine if the code has been ignored by the user. """Determine if the code has been ignored by the user.
:param str code: :param str code:
@ -226,8 +222,7 @@ class DecisionEngine:
return Selected.Implicitly return Selected.Implicitly
def more_specific_decision_for(self, code): def more_specific_decision_for(self, code: str) -> Decision:
# type: (str) -> Decision
select = find_first_match(code, self.all_selected) select = find_first_match(code, self.all_selected)
extra_select = find_first_match(code, self.extended_selected) extra_select = find_first_match(code, self.extended_selected)
ignore = find_first_match(code, self.ignored) ignore = find_first_match(code, self.ignored)
@ -275,8 +270,7 @@ class DecisionEngine:
return Decision.Ignored return Decision.Ignored
return Decision.Selected return Decision.Selected
def make_decision(self, code): def make_decision(self, code: str) -> Decision:
# type: (str) -> Decision
"""Decide if code should be ignored or selected.""" """Decide if code should be ignored or selected."""
LOG.debug('Deciding if "%s" should be reported', code) LOG.debug('Deciding if "%s" should be reported', code)
selected = self.was_selected(code) selected = self.was_selected(code)
@ -302,8 +296,7 @@ class DecisionEngine:
decision = Decision.Ignored # pylint: disable=R0204 decision = Decision.Ignored # pylint: disable=R0204
return decision return decision
def decision_for(self, code): def decision_for(self, code: str) -> Decision:
# type: (str) -> Decision
"""Return the decision for a specific code. """Return the decision for a specific code.
This method caches the decisions for codes to avoid retracing the same This method caches the decisions for codes to avoid retracing the same
@ -330,10 +323,10 @@ class StyleGuideManager:
def __init__( def __init__(
self, self,
options, # type: argparse.Namespace options: argparse.Namespace,
formatter, # type: base_formatter.BaseFormatter formatter: base_formatter.BaseFormatter,
decider=None, # type: Optional[DecisionEngine] decider: Optional[DecisionEngine] = None,
): # type: (...) -> None ) -> None:
"""Initialize our StyleGuide. """Initialize our StyleGuide.
.. todo:: Add parameter documentation. .. todo:: Add parameter documentation.
@ -342,7 +335,7 @@ class StyleGuideManager:
self.formatter = formatter self.formatter = formatter
self.stats = statistics.Statistics() self.stats = statistics.Statistics()
self.decider = decider or DecisionEngine(options) self.decider = decider or DecisionEngine(options)
self.style_guides = [] # type: List[StyleGuide] self.style_guides: List[StyleGuide] = []
self.default_style_guide = StyleGuide( self.default_style_guide = StyleGuide(
options, formatter, self.stats, decider=decider options, formatter, self.stats, decider=decider
) )
@ -353,8 +346,9 @@ class StyleGuideManager:
) )
) )
def populate_style_guides_with(self, options): def populate_style_guides_with(
# type: (argparse.Namespace) -> Generator[StyleGuide, None, None] self, options: argparse.Namespace
) -> Generator["StyleGuide", None, None]:
"""Generate style guides from the per-file-ignores option. """Generate style guides from the per-file-ignores option.
:param options: :param options:
@ -375,7 +369,7 @@ class StyleGuideManager:
) )
@functools.lru_cache(maxsize=None) @functools.lru_cache(maxsize=None)
def style_guide_for(self, filename): # type: (str) -> StyleGuide def style_guide_for(self, filename: str) -> "StyleGuide":
"""Find the StyleGuide for the filename in particular.""" """Find the StyleGuide for the filename in particular."""
guides = sorted( guides = sorted(
(g for g in self.style_guides if g.applies_to(filename)), (g for g in self.style_guides if g.applies_to(filename)),
@ -386,8 +380,9 @@ class StyleGuideManager:
return guides[0] return guides[0]
@contextlib.contextmanager @contextlib.contextmanager
def processing_file(self, filename): def processing_file(
# type: (str) -> Generator[StyleGuide, None, None] self, filename: str
) -> Generator["StyleGuide", None, None]:
"""Record the fact that we're processing the file's results.""" """Record the fact that we're processing the file's results."""
guide = self.style_guide_for(filename) guide = self.style_guide_for(filename)
with guide.processing_file(filename): with guide.processing_file(filename):
@ -395,14 +390,13 @@ class StyleGuideManager:
def handle_error( def handle_error(
self, self,
code, code: str,
filename, filename: str,
line_number, line_number: int,
column_number, column_number: Optional[int],
text, text: str,
physical_line=None, physical_line: Optional[str] = None,
): ) -> int:
# type: (str, str, int, Optional[int], str, Optional[str]) -> int
"""Handle an error reported by a check. """Handle an error reported by a check.
:param str code: :param str code:
@ -430,8 +424,7 @@ class StyleGuideManager:
code, filename, line_number, column_number, text, physical_line code, filename, line_number, column_number, text, physical_line
) )
def add_diff_ranges(self, diffinfo): def add_diff_ranges(self, diffinfo: Dict[str, Set[int]]) -> None:
# type: (Dict[str, Set[int]]) -> None
"""Update the StyleGuides to filter out information not in the diff. """Update the StyleGuides to filter out information not in the diff.
This provides information to the underlying StyleGuides so that only This provides information to the underlying StyleGuides so that only
@ -449,11 +442,11 @@ class StyleGuide:
def __init__( def __init__(
self, self,
options, # type: argparse.Namespace options: argparse.Namespace,
formatter, # type: base_formatter.BaseFormatter formatter: base_formatter.BaseFormatter,
stats, # type: statistics.Statistics stats: statistics.Statistics,
filename=None, # type: Optional[str] filename: Optional[str] = None,
decider=None, # type: Optional[DecisionEngine] decider: Optional[DecisionEngine] = None,
): ):
"""Initialize our StyleGuide. """Initialize our StyleGuide.
@ -466,14 +459,17 @@ class StyleGuide:
self.filename = filename self.filename = filename
if self.filename: if self.filename:
self.filename = utils.normalize_path(self.filename) self.filename = utils.normalize_path(self.filename)
self._parsed_diff = {} # type: Dict[str, Set[int]] self._parsed_diff: Dict[str, Set[int]] = {}
def __repr__(self): # type: () -> str def __repr__(self) -> str:
"""Make it easier to debug which StyleGuide we're using.""" """Make it easier to debug which StyleGuide we're using."""
return f"<StyleGuide [{self.filename}]>" return f"<StyleGuide [{self.filename}]>"
def copy(self, filename=None, extend_ignore_with=None): def copy(
# type: (Optional[str], Optional[Sequence[str]]) -> StyleGuide self,
filename: Optional[str] = None,
extend_ignore_with: Optional[Sequence[str]] = None,
) -> "StyleGuide":
"""Create a copy of this style guide with different values.""" """Create a copy of this style guide with different values."""
filename = filename or self.filename filename = filename or self.filename
options = copy.deepcopy(self.options) options = copy.deepcopy(self.options)
@ -483,14 +479,15 @@ class StyleGuide:
) )
@contextlib.contextmanager @contextlib.contextmanager
def processing_file(self, filename): def processing_file(
# type: (str) -> Generator[StyleGuide, None, None] self, filename: str
) -> Generator["StyleGuide", None, None]:
"""Record the fact that we're processing the file's results.""" """Record the fact that we're processing the file's results."""
self.formatter.beginning(filename) self.formatter.beginning(filename)
yield self yield self
self.formatter.finished(filename) self.formatter.finished(filename)
def applies_to(self, filename): # type: (str) -> bool def applies_to(self, filename: str) -> bool:
"""Check if this StyleGuide applies to the file. """Check if this StyleGuide applies to the file.
:param str filename: :param str filename:
@ -510,8 +507,7 @@ class StyleGuide:
logger=LOG, logger=LOG,
) )
def should_report_error(self, code): def should_report_error(self, code: str) -> Decision:
# type: (str) -> Decision
"""Determine if the error code should be reported or ignored. """Determine if the error code should be reported or ignored.
This method only cares about the select and ignore rules as specified This method only cares about the select and ignore rules as specified
@ -527,14 +523,13 @@ class StyleGuide:
def handle_error( def handle_error(
self, self,
code, code: str,
filename, filename: str,
line_number, line_number: int,
column_number, column_number: Optional[int],
text, text: str,
physical_line=None, physical_line: Optional[str] = None,
): ) -> int:
# type: (str, str, int, Optional[int], str, Optional[str]) -> int
"""Handle an error reported by a check. """Handle an error reported by a check.
:param str code: :param str code:
@ -586,8 +581,7 @@ class StyleGuide:
return 1 return 1
return 0 return 0
def add_diff_ranges(self, diffinfo): def add_diff_ranges(self, diffinfo: Dict[str, Set[int]]) -> None:
# type: (Dict[str, Set[int]]) -> None
"""Update the StyleGuide to filter out information not in the diff. """Update the StyleGuide to filter out information not in the diff.
This provides information to the StyleGuide so that only the errors This provides information to the StyleGuide so that only the errors
@ -599,14 +593,15 @@ class StyleGuide:
self._parsed_diff = diffinfo self._parsed_diff = diffinfo
def find_more_specific(selected, ignored): # type: (str, str) -> Decision def find_more_specific(selected: str, ignored: str) -> Decision:
if selected.startswith(ignored) and selected != ignored: if selected.startswith(ignored) and selected != ignored:
return Decision.Selected return Decision.Selected
return Decision.Ignored return Decision.Ignored
def find_first_match(error_code, code_list): def find_first_match(
# type: (str, Tuple[str, ...]) -> Optional[str] error_code: str, code_list: Tuple[str, ...]
) -> Optional[str]:
startswith = error_code.startswith startswith = error_code.startswith
for code in code_list: for code in code_list:
if startswith(code): if startswith(code):

View file

@ -32,8 +32,9 @@ COMMA_SEPARATED_LIST_RE = re.compile(r"[,\s]")
LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]") LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]")
def parse_comma_separated_list(value, regexp=COMMA_SEPARATED_LIST_RE): def parse_comma_separated_list(
# type: (str, Pattern[str]) -> List[str] value: str, regexp: Pattern[str] = COMMA_SEPARATED_LIST_RE
) -> List[str]:
"""Parse a comma-separated list. """Parse a comma-separated list.
:param value: :param value:
@ -67,8 +68,7 @@ _FILE_LIST_TOKEN_TYPES = [
] ]
def _tokenize_files_to_codes_mapping(value): def _tokenize_files_to_codes_mapping(value: str) -> List[_Token]:
# type: (str) -> List[_Token]
tokens = [] tokens = []
i = 0 i = 0
while i < len(value): while i < len(value):
@ -85,8 +85,9 @@ def _tokenize_files_to_codes_mapping(value):
return tokens return tokens
def parse_files_to_codes_mapping(value_): # noqa: C901 def parse_files_to_codes_mapping( # noqa: C901
# type: (Union[Sequence[str], str]) -> List[Tuple[str, List[str]]] value_: Union[Sequence[str], str]
) -> List[Tuple[str, List[str]]]:
"""Parse a files-to-codes mapping. """Parse a files-to-codes mapping.
A files-to-codes mapping a sequence of values specified as A files-to-codes mapping a sequence of values specified as
@ -101,17 +102,17 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
else: else:
value = value_ value = value_
ret = [] # type: List[Tuple[str, List[str]]] ret: List[Tuple[str, List[str]]] = []
if not value.strip(): if not value.strip():
return ret return ret
class State: class State:
seen_sep = True seen_sep = True
seen_colon = False seen_colon = False
filenames = [] # type: List[str] filenames: List[str] = []
codes = [] # type: List[str] codes: List[str] = []
def _reset(): # type: () -> None def _reset() -> None:
if State.codes: if State.codes:
for filename in State.filenames: for filename in State.filenames:
ret.append((filename, State.codes)) ret.append((filename, State.codes))
@ -120,8 +121,8 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
State.filenames = [] State.filenames = []
State.codes = [] State.codes = []
def _unexpected_token(): # type: () -> exceptions.ExecutionError def _unexpected_token() -> exceptions.ExecutionError:
def _indent(s): # type: (str) -> str def _indent(s: str) -> str:
return " " + s.strip().replace("\n", "\n ") return " " + s.strip().replace("\n", "\n ")
return exceptions.ExecutionError( return exceptions.ExecutionError(
@ -163,8 +164,9 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
return ret return ret
def normalize_paths(paths, parent=os.curdir): def normalize_paths(
# type: (Sequence[str], str) -> List[str] paths: Sequence[str], parent: str = os.curdir
) -> List[str]:
"""Normalize a list of paths relative to a parent directory. """Normalize a list of paths relative to a parent directory.
:returns: :returns:
@ -176,8 +178,7 @@ def normalize_paths(paths, parent=os.curdir):
return [normalize_path(p, parent) for p in paths] return [normalize_path(p, parent) for p in paths]
def normalize_path(path, parent=os.curdir): def normalize_path(path: str, parent: str = os.curdir) -> str:
# type: (str, str) -> str
"""Normalize a single-path. """Normalize a single-path.
:returns: :returns:
@ -199,7 +200,7 @@ def normalize_path(path, parent=os.curdir):
@functools.lru_cache(maxsize=1) @functools.lru_cache(maxsize=1)
def stdin_get_value(): # type: () -> str def stdin_get_value() -> str:
"""Get and cache it so plugins can use it.""" """Get and cache it so plugins can use it."""
stdin_value = sys.stdin.buffer.read() stdin_value = sys.stdin.buffer.read()
fd = io.BytesIO(stdin_value) fd = io.BytesIO(stdin_value)
@ -211,13 +212,12 @@ def stdin_get_value(): # type: () -> str
return stdin_value.decode("utf-8") return stdin_value.decode("utf-8")
def stdin_get_lines(): # type: () -> List[str] def stdin_get_lines() -> List[str]:
"""Return lines of stdin split according to file splitting.""" """Return lines of stdin split according to file splitting."""
return list(io.StringIO(stdin_get_value())) return list(io.StringIO(stdin_get_value()))
def parse_unified_diff(diff=None): def parse_unified_diff(diff: Optional[str] = None) -> Dict[str, Set[int]]:
# type: (Optional[str]) -> Dict[str, Set[int]]
"""Parse the unified diff passed on stdin. """Parse the unified diff passed on stdin.
:returns: :returns:
@ -231,7 +231,7 @@ def parse_unified_diff(diff=None):
number_of_rows = None number_of_rows = None
current_path = None current_path = None
parsed_paths = collections.defaultdict(set) # type: Dict[str, Set[int]] parsed_paths: Dict[str, Set[int]] = collections.defaultdict(set)
for line in diff.splitlines(): for line in diff.splitlines():
if number_of_rows: if number_of_rows:
# NOTE(sigmavirus24): Below we use a slice because stdin may be # NOTE(sigmavirus24): Below we use a slice because stdin may be
@ -289,8 +289,7 @@ def parse_unified_diff(diff=None):
return parsed_paths return parsed_paths
def is_windows(): def is_windows() -> bool:
# type: () -> bool
"""Determine if we're running on Windows. """Determine if we're running on Windows.
:returns: :returns:
@ -301,8 +300,7 @@ def is_windows():
return os.name == "nt" return os.name == "nt"
def is_using_stdin(paths): def is_using_stdin(paths: List[str]) -> bool:
# type: (List[str]) -> bool
"""Determine if we're going to read from stdin. """Determine if we're going to read from stdin.
:param list paths: :param list paths:
@ -315,12 +313,14 @@ def is_using_stdin(paths):
return "-" in paths return "-" in paths
def _default_predicate(*args): # type: (*str) -> bool def _default_predicate(*args: str) -> bool:
return False return False
def filenames_from(arg, predicate=None): def filenames_from(
# type: (str, Optional[Callable[[str], bool]]) -> Generator[str, None, None] # noqa: E501 arg: str, predicate: Optional[Callable[[str], bool]] = None
) -> Generator[str, None, None]:
# noqa: E501
"""Generate filenames from an argument. """Generate filenames from an argument.
:param str arg: :param str arg:
@ -360,8 +360,7 @@ def filenames_from(arg, predicate=None):
yield arg yield arg
def fnmatch(filename, patterns): def fnmatch(filename: str, patterns: Sequence[str]) -> bool:
# type: (str, Sequence[str]) -> bool
"""Wrap :func:`fnmatch.fnmatch` to add some functionality. """Wrap :func:`fnmatch.fnmatch` to add some functionality.
:param str filename: :param str filename:
@ -379,8 +378,7 @@ def fnmatch(filename, patterns):
return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns) return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns)
def parameters_for(plugin): def parameters_for(plugin: "Plugin") -> Dict[str, bool]:
# type: (Plugin) -> Dict[str, bool]
"""Return the parameters for the plugin. """Return the parameters for the plugin.
This will inspect the plugin and return either the function parameters This will inspect the plugin and return either the function parameters
@ -414,8 +412,12 @@ def parameters_for(plugin):
return parameters return parameters
def matches_filename(path, patterns, log_message, logger): def matches_filename(
# type: (str, Sequence[str], str, logging.Logger) -> bool path: str,
patterns: Sequence[str],
log_message: str,
logger: logging.Logger,
) -> bool:
"""Use fnmatch to discern if a path exists in patterns. """Use fnmatch to discern if a path exists in patterns.
:param str path: :param str path:
@ -447,7 +449,7 @@ def matches_filename(path, patterns, log_message, logger):
return match return match
def get_python_version(): # type: () -> str def get_python_version() -> str:
"""Find and format the python implementation and version. """Find and format the python implementation and version.
:returns: :returns: