Move processor to its own module

This commit is contained in:
Ian Cordasco 2016-03-04 20:03:05 -06:00
parent 23c9091b1a
commit f7a8b7ade7
2 changed files with 314 additions and 265 deletions

View file

@ -1,6 +1,4 @@
"""Checker Manager and Checker classes.""" """Checker Manager and Checker classes."""
import contextlib
import io
import logging import logging
import os import os
import sys import sys
@ -11,15 +9,12 @@ try:
except ImportError: except ImportError:
multiprocessing = None multiprocessing = None
from flake8 import defaults
from flake8 import exceptions from flake8 import exceptions
from flake8 import processor
from flake8 import utils from flake8 import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
SKIP_TOKENS = frozenset([tokenize.NL, tokenize.NEWLINE, tokenize.INDENT,
tokenize.DEDENT])
class Manager(object): class Manager(object):
"""Manage the parallelism and checker instances for each plugin and file. """Manage the parallelism and checker instances for each plugin and file.
@ -190,7 +185,8 @@ class FileChecker(object):
def _make_processor(self): def _make_processor(self):
try: try:
return FileProcessor(self.filename, self.style_guide.options) return processor.FileProcessor(self.filename,
self.style_guide.options)
except IOError: except IOError:
# If we can not read the file due to an IOError (e.g., the file # If we can not read the file due to an IOError (e.g., the file
# does not exist or we do not have the permissions to open it) # does not exist or we do not have the permissions to open it)
@ -252,15 +248,6 @@ class FileChecker(object):
self.processor.check_physical_error(error_code, physical_line) self.processor.check_physical_error(error_code, physical_line)
def _log_token(self, token):
if token[2][0] == token[3][0]:
pos = '[%s:%s]' % (token[2][1] or '', token[3][1])
else:
pos = 'l.%s' % token[3][0]
LOG.debug('l.%s\t%s\t%s\t%r' %
(token[2][0], pos, tokenize.tok_name[token[0]],
token[1]))
def process_tokens(self): def process_tokens(self):
"""Process tokens and trigger checks. """Process tokens and trigger checks.
@ -269,23 +256,23 @@ class FileChecker(object):
:meth:`flake8.checker.FileChecker.run_checks`. :meth:`flake8.checker.FileChecker.run_checks`.
""" """
parens = 0 parens = 0
processor = self.processor file_processor = self.processor
for token in processor.generate_tokens(): for token in file_processor.generate_tokens():
self.check_physical_eol(token) self.check_physical_eol(token)
token_type, text = token[0:2] token_type, text = token[0:2]
self._log_token(token) processor.log_token(token)
if token_type == tokenize.OP: if token_type == tokenize.OP:
parens = utils.count_parentheses(parens, text) parens = processor.count_parentheses(parens, text)
elif parens == 0: elif parens == 0:
if utils.token_is_newline(token): if processor.token_is_newline(token):
self.handle_newline(token_type) self.handle_newline(token_type)
elif (utils.token_is_comment(token) and elif (processor.token_is_comment(token) and
len(processor.tokens) == 1): len(file_processor.tokens) == 1):
self.handle_comment(token, text) self.handle_comment(token, text)
if processor.tokens: if file_processor.tokens:
# If any tokens are left over, process them # If any tokens are left over, process them
self.run_physical_checks(processor.lines[-1]) self.run_physical_checks(file_processor.lines[-1])
self.run_logical_checks() self.run_logical_checks()
def run_checks(self): def run_checks(self):
@ -319,10 +306,10 @@ class FileChecker(object):
def check_physical_eol(self, token): def check_physical_eol(self, token):
"""Run physical checks if and only if it is at the end of the line.""" """Run physical checks if and only if it is at the end of the line."""
if utils.is_eol_token(token): if processor.is_eol_token(token):
# Obviously, a newline token ends a single physical line. # Obviously, a newline token ends a single physical line.
self.run_physical_checks(token[4]) self.run_physical_checks(token[4])
elif utils.is_multiline_string(token): elif processor.is_multiline_string(token):
# Less obviously, a string that contains newlines is a # Less obviously, a string that contains newlines is a
# multiline string, either triple-quoted or with internal # multiline string, either triple-quoted or with internal
# newlines backslash-escaped. Check every physical line in the # newlines backslash-escaped. Check every physical line in the
@ -338,241 +325,3 @@ class FileChecker(object):
with self.processor.inside_multiline(line_number=line_no): with self.processor.inside_multiline(line_number=line_no):
for line in self.processor.split_line(token): for line in self.processor.split_line(token):
self.run_physical_checks(line + '\n') self.run_physical_checks(line + '\n')
class FileProcessor(object):
"""Processes a file and holdes state.
This processes a file by generating tokens, logical and physical lines,
and AST trees. This also provides a way of passing state about the file
to checks expecting that state. Any public attribute on this object can
be requested by a plugin. The known public attributes are:
- blank_before
- blank_lines
- indect_char
- indent_level
- line_number
- logical_line
- max_line_length
- multiline
- noqa
- previous_indent_level
- previous_logical
- tokens
- total_lines
- verbose
"""
def __init__(self, filename, options):
"""Initialice our file processor.
:param str filename:
Name of the file to process
"""
self.filename = filename
self.lines = self.read_lines()
self.strip_utf_bom()
self.options = options
# Defaults for public attributes
#: Number of preceding blank lines
self.blank_before = 0
#: Number of blank lines
self.blank_lines = 0
#: Character used for indentation
self.indent_char = None
#: Current level of indentation
self.indent_level = 0
#: Line number in the file
self.line_number = 0
#: Current logical line
self.logical_line = ''
#: Maximum line length as configured by the user
self.max_line_length = options.max_line_length
#: Whether the current physical line is multiline
self.multiline = False
#: Whether or not we're observing NoQA
self.noqa = False
#: Previous level of indentation
self.previous_indent_level = 0
#: Previous logical line
self.previous_logical = ''
#: Current set of tokens
self.tokens = []
#: Total number of lines in the file
self.total_lines = len(self.lines)
#: Verbosity level of Flake8
self.verbosity = options.verbosity
@contextlib.contextmanager
def inside_multiline(self, line_number):
"""Context-manager to toggle the multiline attribute."""
self.line_number = line_number
self.multiline = True
yield
self.multiline = False
def reset_blank_before(self):
"""Reset the blank_before attribute to zero."""
self.blank_before = 0
def delete_first_token(self):
"""Delete the first token in the list of tokens."""
del self.tokens[0]
def visited_new_blank_line(self):
"""Note that we visited a new blank line."""
self.blank_lines += 1
def build_logical_line_tokens(self):
"""Build the mapping, comments, and logical line lists."""
logical = []
comments = []
length = 0
previous_row = previous_column = mapping = None
for token_type, text, start, end, line in self.tokens:
if token_type in SKIP_TOKENS:
continue
if not mapping:
mapping = [(0, start)]
if token_type == tokenize.COMMENT:
comments.append(text)
continue
if token_type == tokenize.STRING:
text = utils.mutate_string(text)
if previous_row:
(start_row, start_column) = start
if previous_row != start_row:
row_index = previous_row - 1
column_index = previous_column - 1
previous_text = self.lines[row_index][column_index]
if (previous_text == ',' or
(previous_text not in '{[(' and
text not in '}])')):
text = ' ' + text
elif previous_column != start_column:
text = line[previous_column:start_column] + text
logical.append(text)
length += len(text)
mapping.append((length, end))
(previous_row, previous_column) = end
return comments, logical, mapping
def build_logical_line(self):
"""Build a logical line from the current tokens list."""
comments, logical, mapping_list = self.build_logical_line_tokens()
return ''.join(comments), ''.join(logical), mapping_list
def split_line(self, token):
"""Split a physical line's line based on new-lines.
This also auto-increments the line number for the caller.
"""
for line in token[1].split('\n')[:-1]:
yield line
self.line_number += 1
def keyword_arguments_for(self, parameters, arguments=None):
"""Generate the keyword arguments for a list of parameters."""
if arguments is None:
arguments = {}
for param in parameters:
if param not in arguments:
arguments[param] = getattr(self, param)
return arguments
def check_physical_error(self, error_code, line):
"""Update attributes based on error code and line."""
if error_code == 'E101':
self.indent_char = line[0]
def generate_tokens(self):
"""Tokenize the file and yield the tokens.
:raises flake8.exceptions.InvalidSyntax:
If a :class:`tokenize.TokenError` is raised while generating
tokens.
"""
try:
for token in tokenize.generate_tokens(self.next_line):
if token[2][0] > self.total_lines:
break
self.tokens.append(token)
yield token
# NOTE(sigmavirus24): pycodestyle was catching both a SyntaxError
# and a tokenize.TokenError. In looking a the source on Python 2 and
# Python 3, the SyntaxError should never arise from generate_tokens.
# If we were using tokenize.tokenize, we would have to catch that. Of
# course, I'm going to be unsurprised to be proven wrong at a later
# date.
except tokenize.TokenError as exc:
raise exceptions.InvalidSyntax(exc.message, exception=exc)
def next_line(self):
"""Get the next line from the list."""
if self.line_number >= self.total_lines:
return ''
line = self.lines[self.line_number]
self.line_number += 1
if self.indent_char is None and line[:1] in defaults.WHITESPACE:
self.indent_char = line[0]
return line
def read_lines(self):
# type: () -> List[str]
"""Read the lines for this file checker."""
if self.filename is None or self.filename == '-':
self.filename = 'stdin'
return self.read_lines_from_stdin()
return self.read_lines_from_filename()
def _readlines_py2(self):
# type: () -> List[str]
with open(self.filename, 'rU') as fd:
return fd.readlines()
def _readlines_py3(self):
# type: () -> List[str]
try:
with open(self.filename, 'rb') as fd:
(coding, lines) = tokenize.detect_encoding(fd.readline)
textfd = io.TextIOWrapper(fd, coding, line_buffering=True)
return ([l.decode(coding) for l in lines] +
textfd.readlines())
except (LookupError, SyntaxError, UnicodeError):
# If we can't detect the codec with tokenize.detect_encoding, or
# the detected encoding is incorrect, just fallback to latin-1.
with open(self.filename, encoding='latin-1') as fd:
return fd.readlines()
def read_lines_from_filename(self):
# type: () -> List[str]
"""Read the lines for a file."""
if (2, 6) <= sys.version_info < (3, 0):
readlines = self._readlines_py2
elif (3, 0) <= sys.version_info < (4, 0):
readlines = self._readlines_py3
return readlines()
def read_lines_from_stdin(self):
# type: () -> List[str]
"""Read the lines from standard in."""
return utils.stdin_get_value().splitlines(True)
def strip_utf_bom(self):
# type: () -> NoneType
"""Strip the UTF bom from the lines of the file."""
if not self.lines:
# If we have nothing to analyze quit early
return
first_byte = ord(self.lines[0][0])
if first_byte not in (0xEF, 0xFEFF):
return
# If the first byte of the file is a UTF-8 BOM, strip it
if first_byte == 0xFEFF:
self.lines[0] = self.lines[0][1:]
elif self.lines[0][:3] == '\xEF\xBB\xBF':
self.lines[0] = self.lines[0][3:]

300
flake8/processor.py Normal file
View file

@ -0,0 +1,300 @@
"""Module containing our file processor that tokenizes a file for checks."""
import contextlib
import io
import sys
import tokenize
from flake8 import defaults
from flake8 import exceptions
from flake8 import utils
NEWLINE = frozenset([tokenize.NL, tokenize.NEWLINE])
# Work around Python < 2.6 behaviour, which does not generate NL after
# a comment which is on a line by itself.
COMMENT_WITH_NL = tokenize.generate_tokens(['#\n'].pop).send(None)[1] == '#\n'
SKIP_TOKENS = frozenset([tokenize.NL, tokenize.NEWLINE, tokenize.INDENT,
tokenize.DEDENT])
class FileProcessor(object):
"""Processes a file and holdes state.
This processes a file by generating tokens, logical and physical lines,
and AST trees. This also provides a way of passing state about the file
to checks expecting that state. Any public attribute on this object can
be requested by a plugin. The known public attributes are:
- blank_before
- blank_lines
- indect_char
- indent_level
- line_number
- logical_line
- max_line_length
- multiline
- noqa
- previous_indent_level
- previous_logical
- tokens
- total_lines
- verbose
"""
def __init__(self, filename, options):
"""Initialice our file processor.
:param str filename:
Name of the file to process
"""
self.filename = filename
self.lines = self.read_lines()
self.strip_utf_bom()
self.options = options
# Defaults for public attributes
#: Number of preceding blank lines
self.blank_before = 0
#: Number of blank lines
self.blank_lines = 0
#: Character used for indentation
self.indent_char = None
#: Current level of indentation
self.indent_level = 0
#: Line number in the file
self.line_number = 0
#: Current logical line
self.logical_line = ''
#: Maximum line length as configured by the user
self.max_line_length = options.max_line_length
#: Whether the current physical line is multiline
self.multiline = False
#: Whether or not we're observing NoQA
self.noqa = False
#: Previous level of indentation
self.previous_indent_level = 0
#: Previous logical line
self.previous_logical = ''
#: Current set of tokens
self.tokens = []
#: Total number of lines in the file
self.total_lines = len(self.lines)
#: Verbosity level of Flake8
self.verbosity = options.verbosity
@contextlib.contextmanager
def inside_multiline(self, line_number):
"""Context-manager to toggle the multiline attribute."""
self.line_number = line_number
self.multiline = True
yield
self.multiline = False
def reset_blank_before(self):
"""Reset the blank_before attribute to zero."""
self.blank_before = 0
def delete_first_token(self):
"""Delete the first token in the list of tokens."""
del self.tokens[0]
def visited_new_blank_line(self):
"""Note that we visited a new blank line."""
self.blank_lines += 1
def build_logical_line_tokens(self):
"""Build the mapping, comments, and logical line lists."""
logical = []
comments = []
length = 0
previous_row = previous_column = mapping = None
for token_type, text, start, end, line in self.tokens:
if token_type in SKIP_TOKENS:
continue
if not mapping:
mapping = [(0, start)]
if token_type == tokenize.COMMENT:
comments.append(text)
continue
if token_type == tokenize.STRING:
text = utils.mutate_string(text)
if previous_row:
(start_row, start_column) = start
if previous_row != start_row:
row_index = previous_row - 1
column_index = previous_column - 1
previous_text = self.lines[row_index][column_index]
if (previous_text == ',' or
(previous_text not in '{[(' and
text not in '}])')):
text = ' ' + text
elif previous_column != start_column:
text = line[previous_column:start_column] + text
logical.append(text)
length += len(text)
mapping.append((length, end))
(previous_row, previous_column) = end
return comments, logical, mapping
def build_logical_line(self):
"""Build a logical line from the current tokens list."""
comments, logical, mapping_list = self.build_logical_line_tokens()
return ''.join(comments), ''.join(logical), mapping_list
def split_line(self, token):
"""Split a physical line's line based on new-lines.
This also auto-increments the line number for the caller.
"""
for line in token[1].split('\n')[:-1]:
yield line
self.line_number += 1
def keyword_arguments_for(self, parameters, arguments=None):
"""Generate the keyword arguments for a list of parameters."""
if arguments is None:
arguments = {}
for param in parameters:
if param not in arguments:
arguments[param] = getattr(self, param)
return arguments
def check_physical_error(self, error_code, line):
"""Update attributes based on error code and line."""
if error_code == 'E101':
self.indent_char = line[0]
def generate_tokens(self):
"""Tokenize the file and yield the tokens.
:raises flake8.exceptions.InvalidSyntax:
If a :class:`tokenize.TokenError` is raised while generating
tokens.
"""
try:
for token in tokenize.generate_tokens(self.next_line):
if token[2][0] > self.total_lines:
break
self.tokens.append(token)
yield token
# NOTE(sigmavirus24): pycodestyle was catching both a SyntaxError
# and a tokenize.TokenError. In looking a the source on Python 2 and
# Python 3, the SyntaxError should never arise from generate_tokens.
# If we were using tokenize.tokenize, we would have to catch that. Of
# course, I'm going to be unsurprised to be proven wrong at a later
# date.
except tokenize.TokenError as exc:
raise exceptions.InvalidSyntax(exc.message, exception=exc)
def next_line(self):
"""Get the next line from the list."""
if self.line_number >= self.total_lines:
return ''
line = self.lines[self.line_number]
self.line_number += 1
if self.indent_char is None and line[:1] in defaults.WHITESPACE:
self.indent_char = line[0]
return line
def read_lines(self):
# type: () -> List[str]
"""Read the lines for this file checker."""
if self.filename is None or self.filename == '-':
self.filename = 'stdin'
return self.read_lines_from_stdin()
return self.read_lines_from_filename()
def _readlines_py2(self):
# type: () -> List[str]
with open(self.filename, 'rU') as fd:
return fd.readlines()
def _readlines_py3(self):
# type: () -> List[str]
try:
with open(self.filename, 'rb') as fd:
(coding, lines) = tokenize.detect_encoding(fd.readline)
textfd = io.TextIOWrapper(fd, coding, line_buffering=True)
return ([l.decode(coding) for l in lines] +
textfd.readlines())
except (LookupError, SyntaxError, UnicodeError):
# If we can't detect the codec with tokenize.detect_encoding, or
# the detected encoding is incorrect, just fallback to latin-1.
with open(self.filename, encoding='latin-1') as fd:
return fd.readlines()
def read_lines_from_filename(self):
# type: () -> List[str]
"""Read the lines for a file."""
if (2, 6) <= sys.version_info < (3, 0):
readlines = self._readlines_py2
elif (3, 0) <= sys.version_info < (4, 0):
readlines = self._readlines_py3
return readlines()
def read_lines_from_stdin(self):
# type: () -> List[str]
"""Read the lines from standard in."""
return utils.stdin_get_value().splitlines(True)
def strip_utf_bom(self):
# type: () -> NoneType
"""Strip the UTF bom from the lines of the file."""
if not self.lines:
# If we have nothing to analyze quit early
return
first_byte = ord(self.lines[0][0])
if first_byte not in (0xEF, 0xFEFF):
return
# If the first byte of the file is a UTF-8 BOM, strip it
if first_byte == 0xFEFF:
self.lines[0] = self.lines[0][1:]
elif self.lines[0][:3] == '\xEF\xBB\xBF':
self.lines[0] = self.lines[0][3:]
def is_eol_token(token):
"""Check if the token is an end-of-line token."""
return token[0] in NEWLINE or token[4][token[3][1]:].lstrip() == '\\\n'
if COMMENT_WITH_NL: # If on Python 2.6
def is_eol_token(token, _is_eol_token=is_eol_token):
"""Check if the token is an end-of-line token."""
return (_is_eol_token(token) or
(token[0] == tokenize.COMMENT and token[1] == token[4]))
def is_multiline_string(token):
"""Check if this is a multiline string."""
return token[0] == tokenize.STRING and '\n' in token[1]
def token_is_newline(token):
"""Check if the token type is a newline token type."""
return token[0] in NEWLINE
def token_is_comment(token):
"""Check if the token type is a comment."""
return COMMENT_WITH_NL and token[0] == tokenize.COMMENT
def count_parentheses(current_parentheses_count, token_text):
"""Count the number of parentheses."""
if token_text in '([{':
return current_parentheses_count + 1
elif token_text in '}])':
return current_parentheses_count - 1
def log_token(log, token):
"""Log a token to a provided logging object."""
if token[2][0] == token[3][0]:
pos = '[%s:%s]' % (token[2][1] or '', token[3][1])
else:
pos = 'l.%s' % token[3][0]
log.debug('l.%s\t%s\t%s\t%r' %
(token[2][0], pos, tokenize.tok_name[token[0]],
token[1]))