Add type annotations for flake8.processor

This commit is contained in:
Anthony Sottile 2019-09-07 23:31:04 -07:00
parent ae2f38fbeb
commit 92fbdda253
3 changed files with 47 additions and 32 deletions

View file

@ -122,6 +122,8 @@ disallow_untyped_defs = true
disallow_untyped_defs = true disallow_untyped_defs = true
[mypy-flake8.main.cli] [mypy-flake8.main.cli]
disallow_untyped_defs = true disallow_untyped_defs = true
[mypy-flake8.processor]
disallow_untyped_defs = true
[mypy-flake8.statistics] [mypy-flake8.statistics]
disallow_untyped_defs = true disallow_untyped_defs = true
[mypy-flake8.style_guide] [mypy-flake8.style_guide]

View file

@ -1,9 +1,11 @@
"""Module containing our file processor that tokenizes a file for checks.""" """Module containing our file processor that tokenizes a file for checks."""
import argparse
import ast
import contextlib import contextlib
import logging import logging
import sys import sys
import tokenize import tokenize
from typing import Any, Dict, List, Tuple from typing import Any, Dict, Generator, List, Optional, Tuple
import flake8 import flake8
from flake8 import defaults from flake8 import defaults
@ -18,6 +20,10 @@ SKIP_TOKENS = frozenset(
[tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT] [tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT]
) )
_Token = Tuple[int, str, Tuple[int, int], Tuple[int, int], str]
_LogicalMapping = List[Tuple[int, Tuple[int, int]]]
_Logical = Tuple[List[str], List[str], _LogicalMapping]
class FileProcessor(object): class FileProcessor(object):
"""Processes a file and holdes state. """Processes a file and holdes state.
@ -48,6 +54,7 @@ class FileProcessor(object):
""" """
def __init__(self, filename, options, lines=None): def __init__(self, filename, options, lines=None):
# type: (str, argparse.Namespace, Optional[List[str]]) -> None
"""Initialice our file processor. """Initialice our file processor.
:param str filename: :param str filename:
@ -55,9 +62,7 @@ class FileProcessor(object):
""" """
self.options = options self.options = options
self.filename = filename self.filename = filename
self.lines = lines self.lines = lines if lines is not None else self.read_lines()
if lines is None:
self.lines = self.read_lines()
self.strip_utf_bom() self.strip_utf_bom()
# Defaults for public attributes # Defaults for public attributes
@ -68,11 +73,11 @@ class FileProcessor(object):
#: Checker states for each plugin? #: Checker states for each plugin?
self._checker_states = {} # type: Dict[str, Dict[Any, Any]] self._checker_states = {} # type: Dict[str, Dict[Any, Any]]
#: Current checker state #: Current checker state
self.checker_state = None # type: Dict[Any, Any] self.checker_state = {} # type: Dict[Any, Any]
#: User provided option for hang closing #: User provided option for hang closing
self.hang_closing = options.hang_closing self.hang_closing = options.hang_closing
#: Character used for indentation #: Character used for indentation
self.indent_char = None self.indent_char = None # type: Optional[str]
#: Current level of indentation #: Current level of indentation
self.indent_level = 0 self.indent_level = 0
#: Line number in the file #: Line number in the file
@ -93,20 +98,18 @@ class FileProcessor(object):
self.previous_logical = "" self.previous_logical = ""
#: Previous unindented (i.e. top-level) logical line #: Previous unindented (i.e. top-level) logical line
self.previous_unindented_logical_line = "" self.previous_unindented_logical_line = ""
# fmt: off
#: Current set of tokens #: Current set of tokens
self.tokens = [] # type: List[Tuple[int, str, Tuple[int, int], Tuple[int, int], str]] # noqa: E501 self.tokens = [] # type: List[_Token]
# fmt: on
#: Total number of lines in the file #: Total number of lines in the file
self.total_lines = len(self.lines) self.total_lines = len(self.lines)
#: Verbosity level of Flake8 #: Verbosity level of Flake8
self.verbose = options.verbose self.verbose = options.verbose
#: Statistics dictionary #: Statistics dictionary
self.statistics = {"logical lines": 0} self.statistics = {"logical lines": 0}
self._file_tokens = None self._file_tokens = None # type: Optional[List[_Token]]
@property @property
def file_tokens(self): def file_tokens(self): # type: () -> List[_Token]
"""Return the complete set of tokens for a file. """Return the complete set of tokens for a file.
Accessing this attribute *may* raise an InvalidSyntax exception. Accessing this attribute *may* raise an InvalidSyntax exception.
@ -126,25 +129,27 @@ class FileProcessor(object):
@contextlib.contextmanager @contextlib.contextmanager
def inside_multiline(self, line_number): def inside_multiline(self, line_number):
# type: (int) -> Generator[None, None, None]
"""Context-manager to toggle the multiline attribute.""" """Context-manager to toggle the multiline attribute."""
self.line_number = line_number self.line_number = line_number
self.multiline = True self.multiline = True
yield yield
self.multiline = False self.multiline = False
def reset_blank_before(self): def reset_blank_before(self): # type: () -> None
"""Reset the blank_before attribute to zero.""" """Reset the blank_before attribute to zero."""
self.blank_before = 0 self.blank_before = 0
def delete_first_token(self): def delete_first_token(self): # type: () -> None
"""Delete the first token in the list of tokens.""" """Delete the first token in the list of tokens."""
del self.tokens[0] del self.tokens[0]
def visited_new_blank_line(self): def visited_new_blank_line(self): # type: () -> None
"""Note that we visited a new blank line.""" """Note that we visited a new blank line."""
self.blank_lines += 1 self.blank_lines += 1
def update_state(self, mapping): def update_state(self, mapping):
# type: (_LogicalMapping) -> None
"""Update the indent level based on the logical line mapping.""" """Update the indent level based on the logical line mapping."""
(start_row, start_col) = mapping[0][1] (start_row, start_col) = mapping[0][1]
start_line = self.lines[start_row - 1] start_line = self.lines[start_row - 1]
@ -153,13 +158,14 @@ class FileProcessor(object):
self.blank_before = self.blank_lines self.blank_before = self.blank_lines
def update_checker_state_for(self, plugin): def update_checker_state_for(self, plugin):
# type: (Dict[str, Any]) -> None
"""Update the checker_state attribute for the plugin.""" """Update the checker_state attribute for the plugin."""
if "checker_state" in plugin["parameters"]: if "checker_state" in plugin["parameters"]:
self.checker_state = self._checker_states.setdefault( self.checker_state = self._checker_states.setdefault(
plugin["name"], {} plugin["name"], {}
) )
def next_logical_line(self): def next_logical_line(self): # type: () -> None
"""Record the previous logical line. """Record the previous logical line.
This also resets the tokens list and the blank_lines count. This also resets the tokens list and the blank_lines count.
@ -173,12 +179,13 @@ class FileProcessor(object):
self.tokens = [] self.tokens = []
self.noqa = False self.noqa = False
def build_logical_line_tokens(self): def build_logical_line_tokens(self): # type: () -> _Logical
"""Build the mapping, comments, and logical line lists.""" """Build the mapping, comments, and logical line lists."""
logical = [] logical = []
comments = [] comments = []
mapping = [] # type: _LogicalMapping
length = 0 length = 0
previous_row = previous_column = mapping = None previous_row = previous_column = None
for token_type, text, start, end, line in self.tokens: for token_type, text, start, end, line in self.tokens:
if token_type in SKIP_TOKENS: if token_type in SKIP_TOKENS:
continue continue
@ -207,11 +214,12 @@ class FileProcessor(object):
(previous_row, previous_column) = end (previous_row, previous_column) = end
return comments, logical, mapping return comments, logical, mapping
def build_ast(self): def build_ast(self): # type: () -> ast.AST
"""Build an abstract syntax tree from the list of lines.""" """Build an abstract syntax tree from the list of lines."""
return compile("".join(self.lines), "", "exec", PyCF_ONLY_AST) return ast.parse("".join(self.lines))
def build_logical_line(self): def build_logical_line(self):
# type: () -> Tuple[str, str, _LogicalMapping]
"""Build a logical line from the current tokens list.""" """Build a logical line from the current tokens list."""
comments, logical, mapping_list = self.build_logical_line_tokens() comments, logical, mapping_list = self.build_logical_line_tokens()
joined_comments = "".join(comments) joined_comments = "".join(comments)
@ -222,6 +230,7 @@ class FileProcessor(object):
return joined_comments, self.logical_line, mapping_list return joined_comments, self.logical_line, mapping_list
def split_line(self, token): def split_line(self, token):
# type: (_Token) -> Generator[str, None, None]
"""Split a physical line's line based on new-lines. """Split a physical line's line based on new-lines.
This also auto-increments the line number for the caller. This also auto-increments the line number for the caller.
@ -231,6 +240,7 @@ class FileProcessor(object):
self.line_number += 1 self.line_number += 1
def keyword_arguments_for(self, parameters, arguments=None): def keyword_arguments_for(self, parameters, arguments=None):
# type: (Dict[str, bool], Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Generate the keyword arguments for a list of parameters.""" """Generate the keyword arguments for a list of parameters."""
if arguments is None: if arguments is None:
arguments = {} arguments = {}
@ -252,11 +262,12 @@ class FileProcessor(object):
return arguments return arguments
def check_physical_error(self, error_code, line): def check_physical_error(self, error_code, line):
# type: (str, str) -> None
"""Update attributes based on error code and line.""" """Update attributes based on error code and line."""
if error_code == "E101": if error_code == "E101":
self.indent_char = line[0] self.indent_char = line[0]
def generate_tokens(self): def generate_tokens(self): # type: () -> Generator[_Token, None, None]
"""Tokenize the file and yield the tokens. """Tokenize the file and yield the tokens.
:raises flake8.exceptions.InvalidSyntax: :raises flake8.exceptions.InvalidSyntax:
@ -273,6 +284,7 @@ class FileProcessor(object):
raise exceptions.InvalidSyntax(exception=exc) raise exceptions.InvalidSyntax(exception=exc)
def line_for(self, line_number): def line_for(self, line_number):
# type: (int) -> Optional[str]
"""Retrieve the physical line at the specified line number.""" """Retrieve the physical line at the specified line number."""
adjusted_line_number = line_number - 1 adjusted_line_number = line_number - 1
# NOTE(sigmavirus24): Some plugins choose to report errors for empty # NOTE(sigmavirus24): Some plugins choose to report errors for empty
@ -282,7 +294,7 @@ class FileProcessor(object):
return self.lines[adjusted_line_number] return self.lines[adjusted_line_number]
return None return None
def next_line(self): def next_line(self): # type: () -> str
"""Get the next line from the list.""" """Get the next line from the list."""
if self.line_number >= self.total_lines: if self.line_number >= self.total_lines:
return "" return ""
@ -371,17 +383,17 @@ class FileProcessor(object):
self.lines[0] = self.lines[0][3:] self.lines[0] = self.lines[0][3:]
def is_eol_token(token): def is_eol_token(token): # type: (_Token) -> bool
"""Check if the token is an end-of-line token.""" """Check if the token is an end-of-line token."""
return token[0] in NEWLINE or token[4][token[3][1] :].lstrip() == "\\\n" return token[0] in NEWLINE or token[4][token[3][1] :].lstrip() == "\\\n"
def is_multiline_string(token): def is_multiline_string(token): # type: (_Token) -> bool
"""Check if this is a multiline string.""" """Check if this is a multiline string."""
return token[0] == tokenize.STRING and "\n" in token[1] return token[0] == tokenize.STRING and "\n" in token[1]
def token_is_newline(token): def token_is_newline(token): # type: (_Token) -> bool
"""Check if the token type is a newline token type.""" """Check if the token type is a newline token type."""
return token[0] in NEWLINE return token[0] in NEWLINE
@ -396,7 +408,7 @@ def count_parentheses(current_parentheses_count, token_text):
return current_parentheses_count return current_parentheses_count
def log_token(log, token): def log_token(log, token): # type: (logging.Logger, _Token) -> None
"""Log a token to a provided logging object.""" """Log a token to a provided logging object."""
if token[2][0] == token[3][0]: if token[2][0] == token[3][0]:
pos = "[%s:%s]" % (token[2][1] or "", token[3][1]) pos = "[%s:%s]" % (token[2][1] or "", token[3][1])
@ -411,7 +423,7 @@ def log_token(log, token):
# NOTE(sigmavirus24): This was taken wholesale from # NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle # https://github.com/PyCQA/pycodestyle
def expand_indent(line): def expand_indent(line): # type: (str) -> int
r"""Return the amount of indentation. r"""Return the amount of indentation.
Tabs are expanded to the next multiple of 8. Tabs are expanded to the next multiple of 8.
@ -441,14 +453,14 @@ def expand_indent(line):
# NOTE(sigmavirus24): This was taken wholesale from # NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be # https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be
# more descriptive. # more descriptive.
def mutate_string(text): def mutate_string(text): # type: (str) -> str
"""Replace contents with 'xxx' to prevent syntax matching. """Replace contents with 'xxx' to prevent syntax matching.
>>> mute_string('"abc"') >>> mutate_string('"abc"')
'"xxx"' '"xxx"'
>>> mute_string("'''abc'''") >>> mutate_string("'''abc'''")
"'''xxx'''" "'''xxx'''"
>>> mute_string("r'abc'") >>> mutate_string("r'abc'")
"r'xxx'" "r'xxx'"
""" """
# NOTE(sigmavirus24): If there are string modifiers (e.g., b, u, r) # NOTE(sigmavirus24): If there are string modifiers (e.g., b, u, r)

View file

@ -196,7 +196,7 @@ def test_keyword_arguments_for_does_not_handle_attribute_errors(
]) ])
with pytest.raises(AttributeError): with pytest.raises(AttributeError):
file_processor.keyword_arguments_for(['fake']) file_processor.keyword_arguments_for({'fake': True})
@pytest.mark.parametrize('unsplit_line, expected_lines', [ @pytest.mark.parametrize('unsplit_line, expected_lines', [
@ -211,7 +211,8 @@ def test_split_line(unsplit_line, expected_lines, default_options):
'Line 1', 'Line 1',
]) ])
actual_lines = list(file_processor.split_line((1, unsplit_line))) token = (1, unsplit_line, (0, 0), (0, 0), '')
actual_lines = list(file_processor.split_line(token))
assert expected_lines == actual_lines assert expected_lines == actual_lines
assert len(actual_lines) == file_processor.line_number assert len(actual_lines) == file_processor.line_number