diff --git a/flake8/checker.py b/flake8/checker.py index 82f9e79..69bab17 100644 --- a/flake8/checker.py +++ b/flake8/checker.py @@ -1,6 +1,4 @@ """Checker Manager and Checker classes.""" -import contextlib -import io import logging import os import sys @@ -11,15 +9,12 @@ try: except ImportError: multiprocessing = None -from flake8 import defaults from flake8 import exceptions +from flake8 import processor from flake8 import utils LOG = logging.getLogger(__name__) -SKIP_TOKENS = frozenset([tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, - tokenize.DEDENT]) - class Manager(object): """Manage the parallelism and checker instances for each plugin and file. @@ -190,7 +185,8 @@ class FileChecker(object): def _make_processor(self): try: - return FileProcessor(self.filename, self.style_guide.options) + return processor.FileProcessor(self.filename, + self.style_guide.options) except IOError: # If we can not read the file due to an IOError (e.g., the file # does not exist or we do not have the permissions to open it) @@ -252,15 +248,6 @@ class FileChecker(object): self.processor.check_physical_error(error_code, physical_line) - def _log_token(self, token): - if token[2][0] == token[3][0]: - pos = '[%s:%s]' % (token[2][1] or '', token[3][1]) - else: - pos = 'l.%s' % token[3][0] - LOG.debug('l.%s\t%s\t%s\t%r' % - (token[2][0], pos, tokenize.tok_name[token[0]], - token[1])) - def process_tokens(self): """Process tokens and trigger checks. @@ -269,23 +256,23 @@ class FileChecker(object): :meth:`flake8.checker.FileChecker.run_checks`. """ parens = 0 - processor = self.processor - for token in processor.generate_tokens(): + file_processor = self.processor + for token in file_processor.generate_tokens(): self.check_physical_eol(token) token_type, text = token[0:2] - self._log_token(token) + processor.log_token(token) if token_type == tokenize.OP: - parens = utils.count_parentheses(parens, text) + parens = processor.count_parentheses(parens, text) elif parens == 0: - if utils.token_is_newline(token): + if processor.token_is_newline(token): self.handle_newline(token_type) - elif (utils.token_is_comment(token) and - len(processor.tokens) == 1): + elif (processor.token_is_comment(token) and + len(file_processor.tokens) == 1): self.handle_comment(token, text) - if processor.tokens: + if file_processor.tokens: # If any tokens are left over, process them - self.run_physical_checks(processor.lines[-1]) + self.run_physical_checks(file_processor.lines[-1]) self.run_logical_checks() def run_checks(self): @@ -319,10 +306,10 @@ class FileChecker(object): def check_physical_eol(self, token): """Run physical checks if and only if it is at the end of the line.""" - if utils.is_eol_token(token): + if processor.is_eol_token(token): # Obviously, a newline token ends a single physical line. self.run_physical_checks(token[4]) - elif utils.is_multiline_string(token): + elif processor.is_multiline_string(token): # Less obviously, a string that contains newlines is a # multiline string, either triple-quoted or with internal # newlines backslash-escaped. Check every physical line in the @@ -338,241 +325,3 @@ class FileChecker(object): with self.processor.inside_multiline(line_number=line_no): for line in self.processor.split_line(token): self.run_physical_checks(line + '\n') - - -class FileProcessor(object): - """Processes a file and holdes state. - - This processes a file by generating tokens, logical and physical lines, - and AST trees. This also provides a way of passing state about the file - to checks expecting that state. Any public attribute on this object can - be requested by a plugin. The known public attributes are: - - - blank_before - - blank_lines - - indect_char - - indent_level - - line_number - - logical_line - - max_line_length - - multiline - - noqa - - previous_indent_level - - previous_logical - - tokens - - total_lines - - verbose - """ - - def __init__(self, filename, options): - """Initialice our file processor. - - :param str filename: - Name of the file to process - """ - self.filename = filename - self.lines = self.read_lines() - self.strip_utf_bom() - self.options = options - - # Defaults for public attributes - #: Number of preceding blank lines - self.blank_before = 0 - #: Number of blank lines - self.blank_lines = 0 - #: Character used for indentation - self.indent_char = None - #: Current level of indentation - self.indent_level = 0 - #: Line number in the file - self.line_number = 0 - #: Current logical line - self.logical_line = '' - #: Maximum line length as configured by the user - self.max_line_length = options.max_line_length - #: Whether the current physical line is multiline - self.multiline = False - #: Whether or not we're observing NoQA - self.noqa = False - #: Previous level of indentation - self.previous_indent_level = 0 - #: Previous logical line - self.previous_logical = '' - #: Current set of tokens - self.tokens = [] - #: Total number of lines in the file - self.total_lines = len(self.lines) - #: Verbosity level of Flake8 - self.verbosity = options.verbosity - - @contextlib.contextmanager - def inside_multiline(self, line_number): - """Context-manager to toggle the multiline attribute.""" - self.line_number = line_number - self.multiline = True - yield - self.multiline = False - - def reset_blank_before(self): - """Reset the blank_before attribute to zero.""" - self.blank_before = 0 - - def delete_first_token(self): - """Delete the first token in the list of tokens.""" - del self.tokens[0] - - def visited_new_blank_line(self): - """Note that we visited a new blank line.""" - self.blank_lines += 1 - - def build_logical_line_tokens(self): - """Build the mapping, comments, and logical line lists.""" - logical = [] - comments = [] - length = 0 - previous_row = previous_column = mapping = None - for token_type, text, start, end, line in self.tokens: - if token_type in SKIP_TOKENS: - continue - if not mapping: - mapping = [(0, start)] - if token_type == tokenize.COMMENT: - comments.append(text) - continue - if token_type == tokenize.STRING: - text = utils.mutate_string(text) - if previous_row: - (start_row, start_column) = start - if previous_row != start_row: - row_index = previous_row - 1 - column_index = previous_column - 1 - previous_text = self.lines[row_index][column_index] - if (previous_text == ',' or - (previous_text not in '{[(' and - text not in '}])')): - text = ' ' + text - elif previous_column != start_column: - text = line[previous_column:start_column] + text - logical.append(text) - length += len(text) - mapping.append((length, end)) - (previous_row, previous_column) = end - return comments, logical, mapping - - def build_logical_line(self): - """Build a logical line from the current tokens list.""" - comments, logical, mapping_list = self.build_logical_line_tokens() - return ''.join(comments), ''.join(logical), mapping_list - - def split_line(self, token): - """Split a physical line's line based on new-lines. - - This also auto-increments the line number for the caller. - """ - for line in token[1].split('\n')[:-1]: - yield line - self.line_number += 1 - - def keyword_arguments_for(self, parameters, arguments=None): - """Generate the keyword arguments for a list of parameters.""" - if arguments is None: - arguments = {} - for param in parameters: - if param not in arguments: - arguments[param] = getattr(self, param) - return arguments - - def check_physical_error(self, error_code, line): - """Update attributes based on error code and line.""" - if error_code == 'E101': - self.indent_char = line[0] - - def generate_tokens(self): - """Tokenize the file and yield the tokens. - - :raises flake8.exceptions.InvalidSyntax: - If a :class:`tokenize.TokenError` is raised while generating - tokens. - """ - try: - for token in tokenize.generate_tokens(self.next_line): - if token[2][0] > self.total_lines: - break - self.tokens.append(token) - yield token - # NOTE(sigmavirus24): pycodestyle was catching both a SyntaxError - # and a tokenize.TokenError. In looking a the source on Python 2 and - # Python 3, the SyntaxError should never arise from generate_tokens. - # If we were using tokenize.tokenize, we would have to catch that. Of - # course, I'm going to be unsurprised to be proven wrong at a later - # date. - except tokenize.TokenError as exc: - raise exceptions.InvalidSyntax(exc.message, exception=exc) - - def next_line(self): - """Get the next line from the list.""" - if self.line_number >= self.total_lines: - return '' - line = self.lines[self.line_number] - self.line_number += 1 - if self.indent_char is None and line[:1] in defaults.WHITESPACE: - self.indent_char = line[0] - return line - - def read_lines(self): - # type: () -> List[str] - """Read the lines for this file checker.""" - if self.filename is None or self.filename == '-': - self.filename = 'stdin' - return self.read_lines_from_stdin() - return self.read_lines_from_filename() - - def _readlines_py2(self): - # type: () -> List[str] - with open(self.filename, 'rU') as fd: - return fd.readlines() - - def _readlines_py3(self): - # type: () -> List[str] - try: - with open(self.filename, 'rb') as fd: - (coding, lines) = tokenize.detect_encoding(fd.readline) - textfd = io.TextIOWrapper(fd, coding, line_buffering=True) - return ([l.decode(coding) for l in lines] + - textfd.readlines()) - except (LookupError, SyntaxError, UnicodeError): - # If we can't detect the codec with tokenize.detect_encoding, or - # the detected encoding is incorrect, just fallback to latin-1. - with open(self.filename, encoding='latin-1') as fd: - return fd.readlines() - - def read_lines_from_filename(self): - # type: () -> List[str] - """Read the lines for a file.""" - if (2, 6) <= sys.version_info < (3, 0): - readlines = self._readlines_py2 - elif (3, 0) <= sys.version_info < (4, 0): - readlines = self._readlines_py3 - return readlines() - - def read_lines_from_stdin(self): - # type: () -> List[str] - """Read the lines from standard in.""" - return utils.stdin_get_value().splitlines(True) - - def strip_utf_bom(self): - # type: () -> NoneType - """Strip the UTF bom from the lines of the file.""" - if not self.lines: - # If we have nothing to analyze quit early - return - - first_byte = ord(self.lines[0][0]) - if first_byte not in (0xEF, 0xFEFF): - return - - # If the first byte of the file is a UTF-8 BOM, strip it - if first_byte == 0xFEFF: - self.lines[0] = self.lines[0][1:] - elif self.lines[0][:3] == '\xEF\xBB\xBF': - self.lines[0] = self.lines[0][3:] diff --git a/flake8/processor.py b/flake8/processor.py new file mode 100644 index 0000000..e681c03 --- /dev/null +++ b/flake8/processor.py @@ -0,0 +1,300 @@ +"""Module containing our file processor that tokenizes a file for checks.""" +import contextlib +import io +import sys +import tokenize + +from flake8 import defaults +from flake8 import exceptions +from flake8 import utils + +NEWLINE = frozenset([tokenize.NL, tokenize.NEWLINE]) +# Work around Python < 2.6 behaviour, which does not generate NL after +# a comment which is on a line by itself. +COMMENT_WITH_NL = tokenize.generate_tokens(['#\n'].pop).send(None)[1] == '#\n' + +SKIP_TOKENS = frozenset([tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, + tokenize.DEDENT]) + + +class FileProcessor(object): + """Processes a file and holdes state. + + This processes a file by generating tokens, logical and physical lines, + and AST trees. This also provides a way of passing state about the file + to checks expecting that state. Any public attribute on this object can + be requested by a plugin. The known public attributes are: + + - blank_before + - blank_lines + - indect_char + - indent_level + - line_number + - logical_line + - max_line_length + - multiline + - noqa + - previous_indent_level + - previous_logical + - tokens + - total_lines + - verbose + """ + + def __init__(self, filename, options): + """Initialice our file processor. + + :param str filename: + Name of the file to process + """ + self.filename = filename + self.lines = self.read_lines() + self.strip_utf_bom() + self.options = options + + # Defaults for public attributes + #: Number of preceding blank lines + self.blank_before = 0 + #: Number of blank lines + self.blank_lines = 0 + #: Character used for indentation + self.indent_char = None + #: Current level of indentation + self.indent_level = 0 + #: Line number in the file + self.line_number = 0 + #: Current logical line + self.logical_line = '' + #: Maximum line length as configured by the user + self.max_line_length = options.max_line_length + #: Whether the current physical line is multiline + self.multiline = False + #: Whether or not we're observing NoQA + self.noqa = False + #: Previous level of indentation + self.previous_indent_level = 0 + #: Previous logical line + self.previous_logical = '' + #: Current set of tokens + self.tokens = [] + #: Total number of lines in the file + self.total_lines = len(self.lines) + #: Verbosity level of Flake8 + self.verbosity = options.verbosity + + @contextlib.contextmanager + def inside_multiline(self, line_number): + """Context-manager to toggle the multiline attribute.""" + self.line_number = line_number + self.multiline = True + yield + self.multiline = False + + def reset_blank_before(self): + """Reset the blank_before attribute to zero.""" + self.blank_before = 0 + + def delete_first_token(self): + """Delete the first token in the list of tokens.""" + del self.tokens[0] + + def visited_new_blank_line(self): + """Note that we visited a new blank line.""" + self.blank_lines += 1 + + def build_logical_line_tokens(self): + """Build the mapping, comments, and logical line lists.""" + logical = [] + comments = [] + length = 0 + previous_row = previous_column = mapping = None + for token_type, text, start, end, line in self.tokens: + if token_type in SKIP_TOKENS: + continue + if not mapping: + mapping = [(0, start)] + if token_type == tokenize.COMMENT: + comments.append(text) + continue + if token_type == tokenize.STRING: + text = utils.mutate_string(text) + if previous_row: + (start_row, start_column) = start + if previous_row != start_row: + row_index = previous_row - 1 + column_index = previous_column - 1 + previous_text = self.lines[row_index][column_index] + if (previous_text == ',' or + (previous_text not in '{[(' and + text not in '}])')): + text = ' ' + text + elif previous_column != start_column: + text = line[previous_column:start_column] + text + logical.append(text) + length += len(text) + mapping.append((length, end)) + (previous_row, previous_column) = end + return comments, logical, mapping + + def build_logical_line(self): + """Build a logical line from the current tokens list.""" + comments, logical, mapping_list = self.build_logical_line_tokens() + return ''.join(comments), ''.join(logical), mapping_list + + def split_line(self, token): + """Split a physical line's line based on new-lines. + + This also auto-increments the line number for the caller. + """ + for line in token[1].split('\n')[:-1]: + yield line + self.line_number += 1 + + def keyword_arguments_for(self, parameters, arguments=None): + """Generate the keyword arguments for a list of parameters.""" + if arguments is None: + arguments = {} + for param in parameters: + if param not in arguments: + arguments[param] = getattr(self, param) + return arguments + + def check_physical_error(self, error_code, line): + """Update attributes based on error code and line.""" + if error_code == 'E101': + self.indent_char = line[0] + + def generate_tokens(self): + """Tokenize the file and yield the tokens. + + :raises flake8.exceptions.InvalidSyntax: + If a :class:`tokenize.TokenError` is raised while generating + tokens. + """ + try: + for token in tokenize.generate_tokens(self.next_line): + if token[2][0] > self.total_lines: + break + self.tokens.append(token) + yield token + # NOTE(sigmavirus24): pycodestyle was catching both a SyntaxError + # and a tokenize.TokenError. In looking a the source on Python 2 and + # Python 3, the SyntaxError should never arise from generate_tokens. + # If we were using tokenize.tokenize, we would have to catch that. Of + # course, I'm going to be unsurprised to be proven wrong at a later + # date. + except tokenize.TokenError as exc: + raise exceptions.InvalidSyntax(exc.message, exception=exc) + + def next_line(self): + """Get the next line from the list.""" + if self.line_number >= self.total_lines: + return '' + line = self.lines[self.line_number] + self.line_number += 1 + if self.indent_char is None and line[:1] in defaults.WHITESPACE: + self.indent_char = line[0] + return line + + def read_lines(self): + # type: () -> List[str] + """Read the lines for this file checker.""" + if self.filename is None or self.filename == '-': + self.filename = 'stdin' + return self.read_lines_from_stdin() + return self.read_lines_from_filename() + + def _readlines_py2(self): + # type: () -> List[str] + with open(self.filename, 'rU') as fd: + return fd.readlines() + + def _readlines_py3(self): + # type: () -> List[str] + try: + with open(self.filename, 'rb') as fd: + (coding, lines) = tokenize.detect_encoding(fd.readline) + textfd = io.TextIOWrapper(fd, coding, line_buffering=True) + return ([l.decode(coding) for l in lines] + + textfd.readlines()) + except (LookupError, SyntaxError, UnicodeError): + # If we can't detect the codec with tokenize.detect_encoding, or + # the detected encoding is incorrect, just fallback to latin-1. + with open(self.filename, encoding='latin-1') as fd: + return fd.readlines() + + def read_lines_from_filename(self): + # type: () -> List[str] + """Read the lines for a file.""" + if (2, 6) <= sys.version_info < (3, 0): + readlines = self._readlines_py2 + elif (3, 0) <= sys.version_info < (4, 0): + readlines = self._readlines_py3 + return readlines() + + def read_lines_from_stdin(self): + # type: () -> List[str] + """Read the lines from standard in.""" + return utils.stdin_get_value().splitlines(True) + + def strip_utf_bom(self): + # type: () -> NoneType + """Strip the UTF bom from the lines of the file.""" + if not self.lines: + # If we have nothing to analyze quit early + return + + first_byte = ord(self.lines[0][0]) + if first_byte not in (0xEF, 0xFEFF): + return + + # If the first byte of the file is a UTF-8 BOM, strip it + if first_byte == 0xFEFF: + self.lines[0] = self.lines[0][1:] + elif self.lines[0][:3] == '\xEF\xBB\xBF': + self.lines[0] = self.lines[0][3:] + + +def is_eol_token(token): + """Check if the token is an end-of-line token.""" + return token[0] in NEWLINE or token[4][token[3][1]:].lstrip() == '\\\n' + +if COMMENT_WITH_NL: # If on Python 2.6 + def is_eol_token(token, _is_eol_token=is_eol_token): + """Check if the token is an end-of-line token.""" + return (_is_eol_token(token) or + (token[0] == tokenize.COMMENT and token[1] == token[4])) + + +def is_multiline_string(token): + """Check if this is a multiline string.""" + return token[0] == tokenize.STRING and '\n' in token[1] + + +def token_is_newline(token): + """Check if the token type is a newline token type.""" + return token[0] in NEWLINE + + +def token_is_comment(token): + """Check if the token type is a comment.""" + return COMMENT_WITH_NL and token[0] == tokenize.COMMENT + + +def count_parentheses(current_parentheses_count, token_text): + """Count the number of parentheses.""" + if token_text in '([{': + return current_parentheses_count + 1 + elif token_text in '}])': + return current_parentheses_count - 1 + + +def log_token(log, token): + """Log a token to a provided logging object.""" + if token[2][0] == token[3][0]: + pos = '[%s:%s]' % (token[2][1] or '', token[3][1]) + else: + pos = 'l.%s' % token[3][0] + log.debug('l.%s\t%s\t%s\t%r' % + (token[2][0], pos, tokenize.tok_name[token[0]], + token[1]))