Merge branch 'origin/proposed/3.0' into master

This commit is contained in:
Ian Cordasco 2016-06-25 12:01:02 -05:00
commit cee691059f
No known key found for this signature in database
GPG key ID: 656D3395E4A9791A
167 changed files with 11120 additions and 3162 deletions

82
src/flake8/__init__.py Normal file
View file

@ -0,0 +1,82 @@
"""Top-level module for Flake8.
This module
- initializes logging for the command-line tool
- tracks the version of the package
- provides a way to configure logging for the command-line tool
.. autofunction:: flake8.configure_logging
"""
import logging
try:
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
"""Shim for version of Python < 2.7."""
def emit(self, record):
"""Do nothing."""
pass
import sys
LOG = logging.getLogger(__name__)
LOG.addHandler(NullHandler())
# Clean up after LOG config
del NullHandler
__version__ = '3.0.0b1'
__version_info__ = tuple(int(i) for i in __version__.split('.') if i.isdigit())
# There is nothing lower than logging.DEBUG (10) in the logging library,
# but we want an extra level to avoid being too verbose when using -vv.
_EXTRA_VERBOSE = 5
logging.addLevelName(_EXTRA_VERBOSE, 'VERBOSE')
_VERBOSITY_TO_LOG_LEVEL = {
# output more than warnings but not debugging info
1: logging.INFO, # INFO is a numerical level of 20
# output debugging information
2: logging.DEBUG, # DEBUG is a numerical level of 10
# output extra verbose debugging information
3: _EXTRA_VERBOSE,
}
LOG_FORMAT = ('%(name)-25s %(processName)-11s %(relativeCreated)6d '
'%(levelname)-8s %(message)s')
def configure_logging(verbosity, filename=None, logformat=LOG_FORMAT):
"""Configure logging for flake8.
:param int verbosity:
How verbose to be in logging information.
:param str filename:
Name of the file to append log information to.
If ``None`` this will log to ``sys.stderr``.
If the name is "stdout" or "stderr" this will log to the appropriate
stream.
"""
if verbosity <= 0:
return
if verbosity > 3:
verbosity = 3
log_level = _VERBOSITY_TO_LOG_LEVEL[verbosity]
if not filename or filename in ('stderr', 'stdout'):
fileobj = getattr(sys, filename or 'stderr')
handler_cls = logging.StreamHandler
else:
fileobj = filename
handler_cls = logging.FileHandler
handler = handler_cls(fileobj)
handler.setFormatter(logging.Formatter(logformat))
LOG.addHandler(handler)
LOG.setLevel(log_level)
LOG.debug('Added a %s logging handler to logger root at %s',
filename, __name__)

4
src/flake8/__main__.py Normal file
View file

@ -0,0 +1,4 @@
"""Module allowing for ``python -m flake8 ...``."""
from flake8.main import cli
cli.main()

View file

@ -0,0 +1,10 @@
"""Module containing all public entry-points for Flake8.
This is the only submodule in Flake8 with a guaranteed stable API. All other
submodules are considered internal only and are subject to change.
"""
def get_style_guide(**kwargs):
"""Stub out the only function I'm aware of people using."""
pass

610
src/flake8/checker.py Normal file
View file

@ -0,0 +1,610 @@
"""Checker Manager and Checker classes."""
import errno
import logging
import os
import sys
import tokenize
try:
import multiprocessing
except ImportError:
multiprocessing = None
try:
import Queue as queue
except ImportError:
import queue
from flake8 import defaults
from flake8 import exceptions
from flake8 import processor
from flake8 import utils
LOG = logging.getLogger(__name__)
SERIAL_RETRY_ERRNOS = set([
# ENOSPC: Added by sigmavirus24
# > On some operating systems (OSX), multiprocessing may cause an
# > ENOSPC error while trying to trying to create a Semaphore.
# > In those cases, we should replace the customized Queue Report
# > class with pep8's StandardReport class to ensure users don't run
# > into this problem.
# > (See also: https://gitlab.com/pycqa/flake8/issues/74)
errno.ENOSPC,
# NOTE(sigmavirus24): When adding to this list, include the reasoning
# on the lines before the error code and always append your error
# code. Further, please always add a trailing `,` to reduce the visual
# noise in diffs.
])
class Manager(object):
"""Manage the parallelism and checker instances for each plugin and file.
This class will be responsible for the following:
- Determining the parallelism of Flake8, e.g.:
* Do we use :mod:`multiprocessing` or is it unavailable?
* Do we automatically decide on the number of jobs to use or did the
user provide that?
- Falling back to a serial way of processing files if we run into an
OSError related to :mod:`multiprocessing`
- Organizing the results of each checker so we can group the output
together and make our output deterministic.
"""
def __init__(self, style_guide, arguments, checker_plugins):
"""Initialize our Manager instance.
:param style_guide:
The instantiated style guide for this instance of Flake8.
:type style_guide:
flake8.style_guide.StyleGuide
:param list arguments:
The extra arguments parsed from the CLI (if any)
:param checker_plugins:
The plugins representing checks parsed from entry-points.
:type checker_plugins:
flake8.plugins.manager.Checkers
"""
self.arguments = arguments
self.style_guide = style_guide
self.options = style_guide.options
self.checks = checker_plugins
self.jobs = self._job_count()
self.process_queue = None
self.results_queue = None
self.statistics_queue = None
self.using_multiprocessing = self.jobs > 1
self.processes = []
self.checkers = []
self.statistics = {
'files': 0,
'logical lines': 0,
'physical lines': 0,
'tokens': 0,
}
if self.using_multiprocessing:
try:
self.process_queue = multiprocessing.Queue()
self.results_queue = multiprocessing.Queue()
self.statistics_queue = multiprocessing.Queue()
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
self.using_multiprocessing = False
@staticmethod
def _cleanup_queue(q):
while not q.empty():
q.get_nowait()
def _force_cleanup(self):
if self.using_multiprocessing:
for proc in self.processes:
proc.join(0.2)
self._cleanup_queue(self.process_queue)
self._cleanup_queue(self.results_queue)
self._cleanup_queue(self.statistics_queue)
def _process_statistics(self):
all_statistics = self.statistics
if self.using_multiprocessing:
total_number_of_checkers = len(self.checkers)
statistics_gathered = 0
while statistics_gathered < total_number_of_checkers:
try:
statistics = self.statistics_queue.get(block=False)
statistics_gathered += 1
except queue.Empty:
break
for statistic in defaults.STATISTIC_NAMES:
all_statistics[statistic] += statistics[statistic]
else:
statistics_generator = (checker.statistics
for checker in self.checkers)
for statistics in statistics_generator:
for statistic in defaults.STATISTIC_NAMES:
all_statistics[statistic] += statistics[statistic]
all_statistics['files'] += len(self.checkers)
def _job_count(self):
# type: () -> Union[int, NoneType]
# First we walk through all of our error cases:
# - multiprocessing library is not present
# - we're running on windows in which case we know we have significant
# implemenation issues
# - the user provided stdin and that's not something we can handle
# well
# - we're processing a diff, which again does not work well with
# multiprocessing and which really shouldn't require multiprocessing
# - the user provided some awful input
if not multiprocessing:
LOG.warning('The multiprocessing module is not available. '
'Ignoring --jobs arguments.')
return 0
if (utils.is_windows() and
not utils.can_run_multiprocessing_on_windows()):
LOG.warning('The --jobs option is only available on Windows on '
'Python 2.7.11+ and 3.3+. We have detected that you '
'are running an unsupported version of Python on '
'Windows. Ignoring --jobs arguments.')
return 0
if utils.is_using_stdin(self.arguments):
LOG.warning('The --jobs option is not compatible with supplying '
'input using - . Ignoring --jobs arguments.')
return 0
if self.options.diff:
LOG.warning('The --diff option was specified with --jobs but '
'they are not compatible. Ignoring --jobs arguments.')
return 0
jobs = self.options.jobs
if jobs != 'auto' and not jobs.isdigit():
LOG.warning('"%s" is not a valid parameter to --jobs. Must be one '
'of "auto" or a numerical value, e.g., 4.', jobs)
return 0
# If the value is "auto", we want to let the multiprocessing library
# decide the number based on the number of CPUs. However, if that
# function is not implemented for this particular value of Python we
# default to 1
if jobs == 'auto':
try:
return multiprocessing.cpu_count()
except NotImplementedError:
return 0
# Otherwise, we know jobs should be an integer and we can just convert
# it to an integer
return int(jobs)
def _results(self):
seen_done = 0
LOG.info('Retrieving results')
while True:
result = self.results_queue.get()
if result == 'DONE':
seen_done += 1
if seen_done >= self.jobs:
break
continue
yield result
def _handle_results(self, filename, results):
style_guide = self.style_guide
reported_results_count = 0
for (error_code, line_number, column, text, physical_line) in results:
reported_results_count += style_guide.handle_error(
code=error_code,
filename=filename,
line_number=line_number,
column_number=column,
text=text,
physical_line=physical_line,
)
return reported_results_count
def _run_checks_from_queue(self):
LOG.info('Running checks in parallel')
for checker in iter(self.process_queue.get, 'DONE'):
LOG.debug('Running checker for file "%s"', checker.filename)
checker.run_checks(self.results_queue, self.statistics_queue)
self.results_queue.put('DONE')
def is_path_excluded(self, path):
# type: (str) -> bool
"""Check if a path is excluded.
:param str path:
Path to check against the exclude patterns.
:returns:
True if there are exclude patterns and the path matches,
otherwise False.
:rtype:
bool
"""
exclude = self.options.exclude
if not exclude:
return False
basename = os.path.basename(path)
if utils.fnmatch(basename, exclude):
LOG.info('"%s" has been excluded', basename)
return True
absolute_path = os.path.abspath(path)
match = utils.fnmatch(absolute_path, exclude)
LOG.info('"%s" has %sbeen excluded', absolute_path,
'' if match else 'not ')
return match
def make_checkers(self, paths=None):
# type: (List[str]) -> NoneType
"""Create checkers for each file."""
if paths is None:
paths = self.arguments
filename_patterns = self.options.filename
# NOTE(sigmavirus24): Yes this is a little unsightly, but it's our
# best solution right now.
def should_create_file_checker(filename):
"""Determine if we should create a file checker."""
matches_filename_patterns = utils.fnmatch(
filename, filename_patterns
)
is_stdin = filename == '-'
file_exists = os.path.exists(filename)
return (file_exists and matches_filename_patterns) or is_stdin
self.checkers = [
FileChecker(filename, self.checks, self.style_guide)
for argument in paths
for filename in utils.filenames_from(argument,
self.is_path_excluded)
if should_create_file_checker(filename)
]
def report(self):
# type: () -> (int, int)
"""Report all of the errors found in the managed file checkers.
This iterates over each of the checkers and reports the errors sorted
by line number.
:returns:
A tuple of the total results found and the results reported.
:rtype:
tuple(int, int)
"""
results_reported = results_found = 0
for checker in self.checkers:
results = sorted(checker.results, key=lambda tup: (tup[2], tup[3]))
results_reported += self._handle_results(checker.filename,
results)
results_found += len(results)
return (results_found, results_reported)
def run_parallel(self):
"""Run the checkers in parallel."""
LOG.info('Starting %d process workers', self.jobs)
for i in range(self.jobs):
proc = multiprocessing.Process(
target=self._run_checks_from_queue
)
proc.daemon = True
proc.start()
self.processes.append(proc)
final_results = {}
for (filename, results) in self._results():
final_results[filename] = results
for checker in self.checkers:
filename = checker.filename
checker.results = sorted(final_results.get(filename, []),
key=lambda tup: (tup[1], tup[2]))
def run_serial(self):
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks(self.results_queue, self.statistics_queue)
def run(self):
"""Run all the checkers.
This will intelligently decide whether to run the checks in parallel
or whether to run them in serial.
If running the checks in parallel causes a problem (e.g.,
https://gitlab.com/pycqa/flake8/issues/74) this also implements
fallback to serial processing.
"""
try:
if self.using_multiprocessing:
self.run_parallel()
else:
self.run_serial()
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
LOG.exception(oserr)
raise
LOG.warning('Running in serial after OS exception, %r', oserr)
self.run_serial()
def start(self, paths=None):
"""Start checking files.
:param list paths:
Path names to check. This is passed directly to
:meth:`~Manager.make_checkers`.
"""
LOG.info('Making checkers')
self.make_checkers(paths)
if not self.using_multiprocessing:
return
LOG.info('Populating process queue')
for checker in self.checkers:
self.process_queue.put(checker)
for i in range(self.jobs):
self.process_queue.put('DONE')
def stop(self):
"""Stop checking files."""
self._process_statistics()
for proc in self.processes:
LOG.info('Joining %s to the main process', proc.name)
proc.join()
class FileChecker(object):
"""Manage running checks for a file and aggregate the results."""
def __init__(self, filename, checks, style_guide):
"""Initialize our file checker.
:param str filename:
Name of the file to check.
:param checks:
The plugins registered to check the file.
:type checks:
flake8.plugins.manager.Checkers
:param style_guide:
The initialized StyleGuide for this particular run.
:type style_guide:
flake8.style_guide.StyleGuide
"""
self.filename = filename
self.checks = checks
self.style_guide = style_guide
self.results = []
self.processor = self._make_processor()
self.statistics = {
'tokens': 0,
'logical lines': 0,
'physical lines': len(self.processor.lines),
}
def _make_processor(self):
try:
return processor.FileProcessor(self.filename,
self.style_guide.options)
except IOError:
# If we can not read the file due to an IOError (e.g., the file
# does not exist or we do not have the permissions to open it)
# then we need to format that exception for the user.
# NOTE(sigmavirus24): Historically, pep8 has always reported this
# as an E902. We probably *want* a better error code for this
# going forward.
(exc_type, exception) = sys.exc_info()[:2]
message = '{0}: {1}'.format(exc_type.__name__, exception)
self.report('E902', 0, 0, message)
return None
def report(self, error_code, line_number, column, text):
# type: (str, int, int, str) -> str
"""Report an error by storing it in the results list."""
if error_code is None:
error_code, text = text.split(' ', 1)
physical_line = ''
# If we're recovering from a problem in _make_processor, we will not
# have this attribute.
if getattr(self, 'processor', None):
physical_line = self.processor.line_for(line_number)
error = (error_code, line_number, column, text, physical_line)
self.results.append(error)
return error_code
def run_check(self, plugin, **arguments):
"""Run the check in a single plugin."""
LOG.debug('Running %r with %r', plugin, arguments)
self.processor.keyword_arguments_for(plugin.parameters, arguments)
return plugin.execute(**arguments)
def run_ast_checks(self):
"""Run all checks expecting an abstract syntax tree."""
try:
ast = self.processor.build_ast()
except (ValueError, SyntaxError, TypeError):
(exc_type, exception) = sys.exc_info()[:2]
if len(exception.args) > 1:
offset = exception.args[1]
if len(offset) > 2:
offset = offset[1:3]
else:
offset = (1, 0)
self.report('E999', offset[0], offset[1], '%s: %s' %
(exc_type.__name__, exception.args[0]))
return
for plugin in self.checks.ast_plugins:
checker = self.run_check(plugin, tree=ast)
# NOTE(sigmavirus24): If we want to allow for AST plugins that are
# not classes exclusively, we can do the following:
# retrieve_results = getattr(checker, 'run', lambda: checker)
# Otherwise, we just call run on the checker
for (line_number, offset, text, check) in checker.run():
self.report(
error_code=None,
line_number=line_number,
column=offset,
text=text,
)
def run_logical_checks(self):
"""Run all checks expecting a logical line."""
comments, logical_line, mapping = self.processor.build_logical_line()
if not mapping:
return
self.processor.update_state(mapping)
LOG.debug('Logical line: "%s"', logical_line.rstrip())
for plugin in self.checks.logical_line_plugins:
self.processor.update_checker_state_for(plugin)
results = self.run_check(plugin, logical_line=logical_line) or ()
for offset, text in results:
offset = find_offset(offset, mapping)
line_number, column_offset = offset
self.report(
error_code=None,
line_number=line_number,
column=column_offset,
text=text,
)
self.processor.next_logical_line()
def run_physical_checks(self, physical_line):
"""Run all checks for a given physical line."""
for plugin in self.checks.physical_line_plugins:
self.processor.update_checker_state_for(plugin)
result = self.run_check(plugin, physical_line=physical_line)
if result is not None:
column_offset, text = result
error_code = self.report(
error_code=None,
line_number=self.processor.line_number,
column=column_offset,
text=text,
)
self.processor.check_physical_error(error_code, physical_line)
def process_tokens(self):
"""Process tokens and trigger checks.
This can raise a :class:`flake8.exceptions.InvalidSyntax` exception.
Instead of using this directly, you should use
:meth:`flake8.checker.FileChecker.run_checks`.
"""
parens = 0
statistics = self.statistics
file_processor = self.processor
for token in file_processor.generate_tokens():
statistics['tokens'] += 1
self.check_physical_eol(token)
token_type, text = token[0:2]
processor.log_token(LOG, token)
if token_type == tokenize.OP:
parens = processor.count_parentheses(parens, text)
elif parens == 0:
if processor.token_is_newline(token):
self.handle_newline(token_type)
elif (processor.token_is_comment(token) and
len(file_processor.tokens) == 1):
self.handle_comment(token, text)
if file_processor.tokens:
# If any tokens are left over, process them
self.run_physical_checks(file_processor.lines[-1])
self.run_logical_checks()
def run_checks(self, results_queue, statistics_queue):
"""Run checks against the file."""
if self.processor.should_ignore_file():
return
try:
self.process_tokens()
except exceptions.InvalidSyntax as exc:
self.report(exc.error_code, exc.line_number, exc.column_number,
exc.error_message)
self.run_ast_checks()
if results_queue is not None:
results_queue.put((self.filename, self.results))
logical_lines = self.processor.statistics['logical lines']
self.statistics['logical lines'] = logical_lines
if statistics_queue is not None:
statistics_queue.put(self.statistics)
def handle_comment(self, token, token_text):
"""Handle the logic when encountering a comment token."""
# The comment also ends a physical line
token = list(token)
token[1] = token_text.rstrip('\r\n')
token[3] = (token[2][0], token[2][1] + len(token[1]))
self.processor.tokens = [tuple(token)]
self.run_logical_checks()
def handle_newline(self, token_type):
"""Handle the logic when encountering a newline token."""
if token_type == tokenize.NEWLINE:
self.run_logical_checks()
self.processor.reset_blank_before()
elif len(self.processor.tokens) == 1:
# The physical line contains only this token.
self.processor.visited_new_blank_line()
self.processor.delete_first_token()
else:
self.run_logical_checks()
def check_physical_eol(self, token):
"""Run physical checks if and only if it is at the end of the line."""
if processor.is_eol_token(token):
# Obviously, a newline token ends a single physical line.
self.run_physical_checks(token[4])
elif processor.is_multiline_string(token):
# Less obviously, a string that contains newlines is a
# multiline string, either triple-quoted or with internal
# newlines backslash-escaped. Check every physical line in the
# string *except* for the last one: its newline is outside of
# the multiline string, so we consider it a regular physical
# line, and will check it like any other physical line.
#
# Subtleties:
# - have to wind self.line_number back because initially it
# points to the last line of the string, and we want
# check_physical() to give accurate feedback
line_no = token[2][0]
with self.processor.inside_multiline(line_number=line_no):
for line in self.processor.split_line(token):
self.run_physical_checks(line + '\n')
def find_offset(offset, mapping):
"""Find the offset tuple for a single offset."""
if isinstance(offset, tuple):
return offset
for token_offset, position in mapping:
if offset <= token_offset:
break
return (position[0], position[1] + offset - token_offset)

17
src/flake8/defaults.py Normal file
View file

@ -0,0 +1,17 @@
"""Constants that define defaults."""
EXCLUDE = '.svn,CVS,.bzr,.hg,.git,__pycache__,.tox'
IGNORE = 'E121,E123,E126,E226,E24,E704,W503,W504'
SELECT = 'E,F,W,C'
MAX_LINE_LENGTH = 79
TRUTHY_VALUES = set(['true', '1', 't'])
# Other constants
WHITESPACE = frozenset(' \t')
STATISTIC_NAMES = (
'logical lines',
'physical lines',
'tokens',
)

96
src/flake8/exceptions.py Normal file
View file

@ -0,0 +1,96 @@
"""Exception classes for all of Flake8."""
class Flake8Exception(Exception):
"""Plain Flake8 exception."""
pass
class FailedToLoadPlugin(Flake8Exception):
"""Exception raised when a plugin fails to load."""
FORMAT = 'Flake8 failed to load plugin "%(name)s" due to %(exc)s.'
def __init__(self, *args, **kwargs):
"""Initialize our FailedToLoadPlugin exception."""
self.plugin = kwargs.pop('plugin')
self.ep_name = self.plugin.name
self.original_exception = kwargs.pop('exception')
super(FailedToLoadPlugin, self).__init__(*args, **kwargs)
def __str__(self):
"""Return a nice string for our exception."""
return self.FORMAT % {'name': self.ep_name,
'exc': self.original_exception}
class InvalidSyntax(Flake8Exception):
"""Exception raised when tokenizing a file fails."""
def __init__(self, *args, **kwargs):
"""Initialize our InvalidSyntax exception."""
self.original_exception = kwargs.pop('exception')
self.error_code = 'E902'
self.line_number = 1
self.column_number = 0
try:
self.error_message = self.original_exception.message
except AttributeError:
# On Python 3, the IOError is an OSError which has a
# strerror attribute instead of a message attribute
self.error_message = self.original_exception.strerror
super(InvalidSyntax, self).__init__(*args, **kwargs)
class HookInstallationError(Flake8Exception):
"""Parent exception for all hooks errors."""
pass
class GitHookAlreadyExists(HookInstallationError):
"""Exception raised when the git pre-commit hook file already exists."""
def __init__(self, *args, **kwargs):
"""Initialize the path attribute."""
self.path = kwargs.pop('path')
super(GitHookAlreadyExists, self).__init__(*args, **kwargs)
def __str__(self):
"""Provide a nice message regarding the exception."""
msg = ('The Git pre-commit hook ({0}) already exists. To convince '
'Flake8 to install the hook, please remove the existing '
'hook.')
return msg.format(self.path)
class MercurialHookAlreadyExists(HookInstallationError):
"""Exception raised when a mercurial hook is already configured."""
hook_name = None
def __init__(self, *args, **kwargs):
"""Initialize the relevant attributes."""
self.path = kwargs.pop('path')
self.value = kwargs.pop('value')
super(MercurialHookAlreadyExists, self).__init__(*args, **kwargs)
def __str__(self):
"""Return a nicely formatted string for these errors."""
msg = ('The Mercurial {0} hook already exists with "{1}" in {2}. '
'To convince Flake8 to install the hook, please remove the '
'{0} configuration from the [hooks] section of your hgrc.')
return msg.format(self.hook_name, self.value, self.path)
class MercurialCommitHookAlreadyExists(MercurialHookAlreadyExists):
"""Exception raised when the hg commit hook is already configured."""
hook_name = 'commit'
class MercurialQRefreshHookAlreadyExists(MercurialHookAlreadyExists):
"""Exception raised when the hg commit hook is already configured."""
hook_name = 'qrefresh'

View file

@ -0,0 +1 @@
"""Submodule containing the default formatters for Flake8."""

View file

@ -0,0 +1,161 @@
"""The base class and interface for all formatting plugins."""
from __future__ import print_function
class BaseFormatter(object):
"""Class defining the formatter interface.
.. attribute:: options
The options parsed from both configuration files and the command-line.
.. attribute:: filename
If specified by the user, the path to store the results of the run.
.. attribute:: output_fd
Initialized when the :meth:`start` is called. This will be a file
object opened for writing.
.. attribute:: newline
The string to add to the end of a line. This is only used when the
output filename has been specified.
"""
def __init__(self, options):
"""Initialize with the options parsed from config and cli.
This also calls a hook, :meth:`after_init`, so subclasses do not need
to call super to call this method.
:param optparse.Values options:
User specified configuration parsed from both configuration files
and the command-line interface.
"""
self.options = options
self.filename = options.output_file
self.output_fd = None
self.newline = '\n'
self.after_init()
def after_init(self):
"""Initialize the formatter further."""
pass
def start(self):
"""Prepare the formatter to receive input.
This defaults to initializing :attr:`output_fd` if :attr:`filename`
"""
if self.filename:
self.output_fd = open(self.filename, 'w')
def handle(self, error):
"""Handle an error reported by Flake8.
This defaults to calling :meth:`format`, :meth:`show_source`, and
then :meth:`write`. To extend how errors are handled, override this
method.
:param error:
This will be an instance of :class:`~flake8.style_guide.Error`.
:type error:
flake8.style_guide.Error
"""
line = self.format(error)
source = self.show_source(error)
self.write(line, source)
def format(self, error):
"""Format an error reported by Flake8.
This method **must** be implemented by subclasses.
:param error:
This will be an instance of :class:`~flake8.style_guide.Error`.
:type error:
flake8.style_guide.Error
:returns:
The formatted error string.
:rtype:
str
"""
raise NotImplementedError('Subclass of BaseFormatter did not implement'
' format.')
def show_benchmarks(self, benchmarks):
"""Format and print the benchmarks."""
# NOTE(sigmavirus24): The format strings are a little confusing, even
# to me, so here's a quick explanation:
# We specify the named value first followed by a ':' to indicate we're
# formatting the value.
# Next we use '<' to indicate we want the value left aligned.
# Then '10' is the width of the area.
# For floats, finally, we only want only want at most 3 digits after
# the decimal point to be displayed. This is the precision and it
# can not be specified for integers which is why we need two separate
# format strings.
float_format = '{value:<10.3} {statistic}'.format
int_format = '{value:<10} {statistic}'.format
for statistic, value in benchmarks:
if isinstance(value, int):
benchmark = int_format(statistic=statistic, value=value)
else:
benchmark = float_format(statistic=statistic, value=value)
self._write(benchmark)
def show_source(self, error):
"""Show the physical line generating the error.
This also adds an indicator for the particular part of the line that
is reported as generating the problem.
:param error:
This will be an instance of :class:`~flake8.style_guide.Error`.
:type error:
flake8.style_guide.Error
:returns:
The formatted error string if the user wants to show the source.
If the user does not want to show the source, this will return
``None``.
:rtype:
str
"""
if not self.options.show_source:
return None
pointer = (' ' * error.column_number) + '^'
# Physical lines have a newline at the end, no need to add an extra
# one
return error.physical_line + pointer
def _write(self, output):
"""Handle logic of whether to use an output file or print()."""
if self.output_fd is not None:
self.output_fd.write(output + self.newline)
else:
print(output)
def write(self, line, source):
"""Write the line either to the output file or stdout.
This handles deciding whether to write to a file or print to standard
out for subclasses. Override this if you want behaviour that differs
from the default.
:param str line:
The formatted string to print or write.
:param str source:
The source code that has been formatted and associated with the
line of output.
"""
self._write(line)
if source:
self._write(source)
def stop(self):
"""Clean up after reporting is finished."""
if self.output_fd is not None:
self.output_fd.close()
self.output_fd = None

View file

@ -0,0 +1,56 @@
"""Default formatting class for Flake8."""
from flake8.formatting import base
class SimpleFormatter(base.BaseFormatter):
"""Simple abstraction for Default and Pylint formatter commonality.
Sub-classes of this need to define an ``error_format`` attribute in order
to succeed. The ``format`` method relies on that attribute and expects the
``error_format`` string to use the old-style formatting strings with named
parameters:
* code
* text
* path
* row
* col
"""
error_format = None
def format(self, error):
"""Format and write error out.
If an output filename is specified, write formatted errors to that
file. Otherwise, print the formatted error to standard out.
"""
return self.error_format % {
"code": error.code,
"text": error.text,
"path": error.filename,
"row": error.line_number,
"col": error.column_number,
}
class Default(SimpleFormatter):
"""Default formatter for Flake8.
This also handles backwards compatibility for people specifying a custom
format string.
"""
error_format = '%(path)s:%(row)d:%(col)d: %(code)s %(text)s'
def after_init(self):
"""Check for a custom format string."""
if self.options.format.lower() != 'default':
self.error_format = self.options.format
class Pylint(SimpleFormatter):
"""Pylint formatter for Flake8."""
error_format = '%(path)s:%(row)d: [%(code)s] %(text)s'

View file

@ -0,0 +1 @@
"""Module containing the logic for the Flake8 entry-points."""

View file

@ -0,0 +1,296 @@
"""Module containing the application logic for Flake8."""
from __future__ import print_function
import logging
import sys
import time
import flake8
from flake8 import checker
from flake8 import defaults
from flake8 import style_guide
from flake8 import utils
from flake8.main import options
from flake8.options import aggregator
from flake8.options import manager
from flake8.plugins import manager as plugin_manager
LOG = logging.getLogger(__name__)
class Application(object):
"""Abstract our application into a class."""
def __init__(self, program='flake8', version=flake8.__version__):
# type: (str, str) -> NoneType
"""Initialize our application.
:param str program:
The name of the program/application that we're executing.
:param str version:
The version of the program/application we're executing.
"""
#: The timestamp when the Application instance was instantiated.
self.start_time = time.time()
#: The timestamp when the Application finished reported errors.
self.end_time = None
#: The name of the program being run
self.program = program
#: The version of the program being run
self.version = version
#: The instance of :class:`flake8.options.manager.OptionManager` used
#: to parse and handle the options and arguments passed by the user
self.option_manager = manager.OptionManager(
prog='flake8', version=flake8.__version__
)
options.register_default_options(self.option_manager)
# We haven't found or registered our plugins yet, so let's defer
# printing the version until we aggregate options from config files
# and the command-line. First, let's clone our arguments on the CLI,
# then we'll attempt to remove ``--version`` so that we can avoid
# triggering the "version" action in optparse. If it's not there, we
# do not need to worry and we can continue. If it is, we successfully
# defer printing the version until just a little bit later.
# Similarly we have to defer printing the help text until later.
args = sys.argv[:]
try:
args.remove('--version')
except ValueError:
pass
try:
args.remove('--help')
except ValueError:
pass
try:
args.remove('-h')
except ValueError:
pass
preliminary_opts, _ = self.option_manager.parse_args(args)
# Set the verbosity of the program
flake8.configure_logging(preliminary_opts.verbose,
preliminary_opts.output_file)
#: The instance of :class:`flake8.plugins.manager.Checkers`
self.check_plugins = None
#: The instance of :class:`flake8.plugins.manager.Listeners`
self.listening_plugins = None
#: The instance of :class:`flake8.plugins.manager.ReportFormatters`
self.formatting_plugins = None
#: The user-selected formatter from :attr:`formatting_plugins`
self.formatter = None
#: The :class:`flake8.plugins.notifier.Notifier` for listening plugins
self.listener_trie = None
#: The :class:`flake8.style_guide.StyleGuide` built from the user's
#: options
self.guide = None
#: The :class:`flake8.checker.Manager` that will handle running all of
#: the checks selected by the user.
self.file_checker_manager = None
#: The user-supplied options parsed into an instance of
#: :class:`optparse.Values`
self.options = None
#: The left over arguments that were not parsed by
#: :attr:`option_manager`
self.args = None
#: The number of errors, warnings, and other messages after running
#: flake8 and taking into account ignored errors and lines.
self.result_count = 0
#: The total number of errors before accounting for ignored errors and
#: lines.
self.total_result_count = 0
#: Whether the program is processing a diff or not
self.running_against_diff = False
#: The parsed diff information
self.parsed_diff = {}
def exit(self):
# type: () -> NoneType
"""Handle finalization and exiting the program.
This should be the last thing called on the application instance. It
will check certain options and exit appropriately.
"""
if self.options.count:
print(self.result_count)
if not self.options.exit_zero:
raise SystemExit(self.result_count > 0)
def find_plugins(self):
# type: () -> NoneType
"""Find and load the plugins for this application.
If :attr:`check_plugins`, :attr:`listening_plugins`, or
:attr:`formatting_plugins` are ``None`` then this method will update
them with the appropriate plugin manager instance. Given the expense
of finding plugins (via :mod:`pkg_resources`) we want this to be
idempotent and so only update those attributes if they are ``None``.
"""
if self.check_plugins is None:
self.check_plugins = plugin_manager.Checkers()
if self.listening_plugins is None:
self.listening_plugins = plugin_manager.Listeners()
if self.formatting_plugins is None:
self.formatting_plugins = plugin_manager.ReportFormatters()
self.check_plugins.load_plugins()
self.listening_plugins.load_plugins()
self.formatting_plugins.load_plugins()
def register_plugin_options(self):
# type: () -> NoneType
"""Register options provided by plugins to our option manager."""
self.check_plugins.register_options(self.option_manager)
self.check_plugins.register_plugin_versions(self.option_manager)
self.listening_plugins.register_options(self.option_manager)
self.formatting_plugins.register_options(self.option_manager)
def parse_configuration_and_cli(self, argv=None):
# type: (Union[NoneType, List[str]]) -> NoneType
"""Parse configuration files and the CLI options.
:param list argv:
Command-line arguments passed in directly.
"""
if self.options is None and self.args is None:
self.options, self.args = aggregator.aggregate_options(
self.option_manager, argv
)
self.running_against_diff = self.options.diff
if self.running_against_diff:
self.parsed_diff = utils.parse_unified_diff()
self.check_plugins.provide_options(self.option_manager, self.options,
self.args)
self.listening_plugins.provide_options(self.option_manager,
self.options,
self.args)
self.formatting_plugins.provide_options(self.option_manager,
self.options,
self.args)
def make_formatter(self):
# type: () -> NoneType
"""Initialize a formatter based on the parsed options."""
if self.formatter is None:
self.formatter = self.formatting_plugins.get(
self.options.format, self.formatting_plugins['default']
).execute(self.options)
def make_notifier(self):
# type: () -> NoneType
"""Initialize our listener Notifier."""
if self.listener_trie is None:
self.listener_trie = self.listening_plugins.build_notifier()
def make_guide(self):
# type: () -> NoneType
"""Initialize our StyleGuide."""
if self.guide is None:
self.guide = style_guide.StyleGuide(
self.options, self.listener_trie, self.formatter
)
if self.running_against_diff:
self.guide.add_diff_ranges(self.parsed_diff)
def make_file_checker_manager(self):
# type: () -> NoneType
"""Initialize our FileChecker Manager."""
if self.file_checker_manager is None:
self.file_checker_manager = checker.Manager(
style_guide=self.guide,
arguments=self.args,
checker_plugins=self.check_plugins,
)
def run_checks(self):
# type: () -> NoneType
"""Run the actual checks with the FileChecker Manager.
This method encapsulates the logic to make a
:class:`~flake8.checker.Manger` instance run the checks it is
managing.
"""
files = None
if self.running_against_diff:
files = list(sorted(self.parsed_diff.keys()))
self.file_checker_manager.start(files)
self.file_checker_manager.run()
LOG.info('Finished running')
self.file_checker_manager.stop()
self.end_time = time.time()
def report_benchmarks(self):
"""Aggregate, calculate, and report benchmarks for this run."""
if not self.options.benchmark:
return
time_elapsed = self.end_time - self.start_time
statistics = [('seconds elapsed', time_elapsed)]
add_statistic = statistics.append
for statistic in (defaults.STATISTIC_NAMES + ('files',)):
value = self.file_checker_manager.statistics[statistic]
total_description = 'total ' + statistic + ' processed'
add_statistic((total_description, value))
per_second_description = statistic + ' processed per second'
add_statistic((per_second_description, int(value / time_elapsed)))
self.formatter.show_benchmarks(statistics)
def report_errors(self):
# type: () -> NoneType
"""Report all the errors found by flake8 3.0.
This also updates the :attr:`result_count` attribute with the total
number of errors, warnings, and other messages found.
"""
LOG.info('Reporting errors')
results = self.file_checker_manager.report()
self.total_result_count, self.result_count = results
LOG.info('Found a total of %d results and reported %d',
self.total_result_count, self.result_count)
def initialize(self, argv):
# type: () -> NoneType
"""Initialize the application to be run.
This finds the plugins, registers their options, and parses the
command-line arguments.
"""
self.find_plugins()
self.register_plugin_options()
self.parse_configuration_and_cli(argv)
self.make_formatter()
self.make_notifier()
self.make_guide()
self.make_file_checker_manager()
def _run(self, argv):
# type: (Union[NoneType, List[str]]) -> NoneType
self.initialize(argv)
self.run_checks()
self.report_errors()
self.report_benchmarks()
def run(self, argv=None):
# type: (Union[NoneType, List[str]]) -> NoneType
"""Run our application.
This method will also handle KeyboardInterrupt exceptions for the
entirety of the flake8 application. If it sees a KeyboardInterrupt it
will forcibly clean up the :class:`~flake8.checker.Manager`.
"""
try:
self._run(argv)
except KeyboardInterrupt as exc:
LOG.critical('Caught keyboard interrupt from user')
LOG.exception(exc)
self.file_checker_manager._force_cleanup()

17
src/flake8/main/cli.py Normal file
View file

@ -0,0 +1,17 @@
"""Command-line implementation of flake8."""
from flake8.main import application
def main(argv=None):
# type: (Union[NoneType, List[str]]) -> NoneType
"""Main entry-point for the flake8 command-line tool.
This handles the creation of an instance of :class:`Application`, runs it,
and then exits the application.
:param list argv:
The arguments to be passed to the application for parsing.
"""
app = application.Application()
app.run(argv)
app.exit()

207
src/flake8/main/git.py Normal file
View file

@ -0,0 +1,207 @@
"""Module containing the main git hook interface and helpers.
.. autofunction:: hook
.. autofunction:: install
"""
import contextlib
import os
import shutil
import stat
import subprocess
import sys
import tempfile
from flake8 import defaults
from flake8 import exceptions
__all__ = ('hook', 'install')
def hook(lazy=False, strict=False):
"""Execute Flake8 on the files in git's index.
Determine which files are about to be committed and run Flake8 over them
to check for violations.
:param bool lazy:
Find files not added to the index prior to committing. This is useful
if you frequently use ``git commit -a`` for example. This defaults to
False since it will otherwise include files not in the index.
:param bool strict:
If True, return the total number of errors/violations found by Flake8.
This will cause the hook to fail.
:returns:
Total number of errors found during the run.
:rtype:
int
"""
# NOTE(sigmavirus24): Delay import of application until we need it.
from flake8.main import application
app = application.Application()
with make_temporary_directory() as tempdir:
filepaths = list(copy_indexed_files_to(tempdir, lazy))
app.initialize(filepaths)
app.run_checks()
app.report_errors()
if strict:
return app.result_count
return 0
def install():
"""Install the git hook script.
This searches for the ``.git`` directory and will install an executable
pre-commit python script in the hooks sub-directory if one does not
already exist.
:returns:
True if successful, False if the git directory doesn't exist.
:rtype:
bool
:raises:
flake8.exceptions.GitHookAlreadyExists
"""
git_directory = find_git_directory()
if git_directory is None or not os.path.exists(git_directory):
return False
hooks_directory = os.path.join(git_directory, 'hooks')
if not os.path.exists(hooks_directory):
os.mkdir(hooks_directory)
pre_commit_file = os.path.abspath(
os.path.join(hooks_directory, 'pre-commit')
)
if os.path.exists(pre_commit_file):
raise exceptions.GitHookAlreadyExists(
'File already exists',
path=pre_commit_file,
)
executable = get_executable()
with open(pre_commit_file, 'w') as fd:
fd.write(_HOOK_TEMPLATE.format(executable=executable))
# NOTE(sigmavirus24): The following sets:
# - read, write, and execute permissions for the owner
# - read permissions for people in the group
# - read permissions for other people
# The owner needs the file to be readable, writable, and executable
# so that git can actually execute it as a hook.
pre_commit_permissions = stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH
os.chmod(pre_commit_file, pre_commit_permissions)
return True
def get_executable():
if sys.executable is not None:
return sys.executable
return '/usr/bin/env python'
def find_git_directory():
rev_parse = piped_process(['git', 'rev-parse', '--git-dir'])
(stdout, _) = rev_parse.communicate()
stdout = to_text(stdout)
if rev_parse.returncode == 0:
return stdout.strip()
return None
def copy_indexed_files_to(temporary_directory, lazy):
modified_files = find_modified_files(lazy)
for filename in modified_files:
contents = get_staged_contents_from(filename)
yield copy_file_to(temporary_directory, filename, contents)
def copy_file_to(destination_directory, filepath, contents):
directory, filename = os.path.split(os.path.abspath(filepath))
temporary_directory = make_temporary_directory_from(destination_directory,
directory)
if not os.path.exists(temporary_directory):
os.makedirs(temporary_directory)
temporary_filepath = os.path.join(temporary_directory, filename)
with open(temporary_filepath, 'wb') as fd:
fd.write(contents)
return temporary_filepath
def make_temporary_directory_from(destination, directory):
prefix = os.path.commonprefix([directory, destination])
common_directory_path = os.path.relpath(directory, start=prefix)
return os.path.join(destination, common_directory_path)
def find_modified_files(lazy):
diff_index = piped_process(
['git', 'diff-index', '--cached', '--name-only',
'--diff-filter=ACMRTUXB', 'HEAD'],
)
(stdout, _) = diff_index.communicate()
stdout = to_text(stdout)
return stdout.splitlines()
def get_staged_contents_from(filename):
git_show = piped_process(['git', 'show', ':{0}'.format(filename)])
(stdout, _) = git_show.communicate()
return stdout
@contextlib.contextmanager
def make_temporary_directory():
temporary_directory = tempfile.mkdtemp()
yield temporary_directory
shutil.rmtree(temporary_directory, ignore_errors=True)
def to_text(string):
"""Ensure that the string is text."""
if callable(getattr(string, 'decode', None)):
return string.decode('utf-8')
return string
def piped_process(command):
return subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
def git_config_for(parameter):
config = piped_process(['git', 'config', '--get', '--bool', parameter])
(stdout, _) = config.communicate()
return to_text(stdout)
def config_for(parameter):
environment_variable = 'flake8_{0}'.format(parameter).upper()
git_variable = 'flake8.{0}'.format(parameter)
value = os.environ.get(environment_variable, git_config_for(git_variable))
return value.lower() in defaults.TRUTHY_VALUES
_HOOK_TEMPLATE = """#!{executable}
import os
import sys
from flake8.main import git
if __name__ == '__main__':
sys.exit(
git.hook(
strict=git.config_for('strict'),
lazy=git.config_for('lazy'),
)
)
"""

View file

@ -0,0 +1,128 @@
"""Module containing the main mecurial hook interface and helpers.
.. autofunction:: hook
.. autofunction:: install
"""
import configparser
import os
import subprocess
from flake8 import exceptions as exc
__all__ = ('hook', 'install')
def hook(ui, repo, **kwargs):
"""Execute Flake8 on the repository provided by Mercurial.
To understand the parameters read more of the Mercurial documentation
around Hooks: https://www.mercurial-scm.org/wiki/Hook.
We avoid using the ``ui`` attribute because it can cause issues with
the GPL license tha Mercurial is under. We don't import it, but we
avoid using it all the same.
"""
from flake8.main import application
hgrc = find_hgrc(create_if_missing=False)
if hgrc is None:
print('Cannot locate your root mercurial repository.')
raise SystemExit(True)
hgconfig = configparser_for(hgrc)
strict = hgconfig.get('flake8', 'strict', fallback=True)
filenames = list(get_filenames_from(repo, kwargs))
app = application.Application()
app.run(filenames)
if strict:
return app.result_count
return 0
def install():
"""Ensure that the mercurial hooks are installed."""
hgrc = find_hgrc(create_if_missing=True)
if hgrc is None:
return False
hgconfig = configparser_for(hgrc)
if not hgconfig.has_section('hooks'):
hgconfig.add_section('hooks')
if hgconfig.has_option('hooks', 'commit'):
raise exc.MercurialCommitHookAlreadyExists(
path=hgrc,
value=hgconfig.get('hooks', 'commit'),
)
if hgconfig.has_option('hooks', 'qrefresh'):
raise exc.MercurialQRefreshHookAlreadyExists(
path=hgrc,
value=hgconfig.get('hooks', 'qrefresh'),
)
hgconfig.set('hooks', 'commit', 'python:flake8.main.mercurial.hook')
hgconfig.set('hooks', 'qrefresh', 'python:flake8.main.mercurial.hook')
if not hgconfig.has_section('flake8'):
hgconfig.add_section('flake8')
if not hgconfig.has_option('flake8', 'strict'):
hgconfig.set('flake8', 'strict', False)
with open(hgrc, 'w') as fd:
hgconfig.write(fd)
return True
def get_filenames_from(repository, kwargs):
seen_filenames = set()
node = kwargs['node']
for revision in range(repository[node], len(repository)):
for filename in repository[revision].files():
full_filename = os.path.join(repository.root, filename)
have_seen_filename = full_filename in seen_filenames
filename_does_not_exist = not os.path.exists(full_filename)
if have_seen_filename or filename_does_not_exist:
continue
seen_filenames.add(full_filename)
if full_filename.endswith('.py'):
yield full_filename
def find_hgrc(create_if_missing=False):
root = subprocess.Popen(
['hg', 'root'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
(hg_directory, _) = root.communicate()
if callable(getattr(hg_directory, 'decode', None)):
hg_directory = hg_directory.decode('utf-8')
if not os.path.isdir(hg_directory):
return None
hgrc = os.path.abspath(
os.path.join(hg_directory, '.hg', 'hgrc')
)
if not os.path.exists(hgrc):
if create_if_missing:
open(hgrc, 'w').close()
else:
return None
return hgrc
def configparser_for(path):
parser = configparser.ConfigParser(interpolation=None)
parser.read(path)
return parser

201
src/flake8/main/options.py Normal file
View file

@ -0,0 +1,201 @@
"""Contains the logic for all of the default options for Flake8."""
from flake8 import defaults
from flake8.main import vcs
def register_default_options(option_manager):
"""Register the default options on our OptionManager.
The default options include:
- ``-v``/``--verbose``
- ``-q``/``--quiet``
- ``--count``
- ``--diff``
- ``--exclude``
- ``--filename``
- ``--format``
- ``--hang-closing``
- ``--ignore``
- ``--max-line-length``
- ``--select``
- ``--disable-noqa``
- ``--show-source``
- ``--statistics``
- ``--enable-extensions``
- ``--exit-zero``
- ``-j``/``--jobs``
- ``--output-file``
- ``--append-config``
- ``--config``
- ``--isolated``
"""
add_option = option_manager.add_option
# pep8 options
add_option(
'-v', '--verbose', default=0, action='count',
parse_from_config=True,
help='Print more information about what is happening in flake8.'
' This option is repeatable and will increase verbosity each '
'time it is repeated.',
)
add_option(
'-q', '--quiet', default=0, action='count',
parse_from_config=True,
help='Report only file names, or nothing. This option is repeatable.',
)
add_option(
'--count', action='store_true', parse_from_config=True,
help='Print total number of errors and warnings to standard error and'
' set the exit code to 1 if total is not empty.',
)
add_option(
'--diff', action='store_true',
help='Report changes only within line number ranges in the unified '
'diff provided on standard in by the user.',
)
add_option(
'--exclude', metavar='patterns', default=defaults.EXCLUDE,
comma_separated_list=True, parse_from_config=True,
normalize_paths=True,
help='Comma-separated list of files or directories to exclude.'
' (Default: %default)',
)
add_option(
'--filename', metavar='patterns', default='*.py',
parse_from_config=True, comma_separated_list=True,
help='Only check for filenames matching the patterns in this comma-'
'separated list. (Default: %default)',
)
add_option(
'--stdin-display-name', default='stdin',
help='The name used when reporting errors from code passed via stdin.'
' This is useful for editors piping the file contents to flake8.'
' (Default: %default)',
)
# TODO(sigmavirus24): Figure out --first/--repeat
# NOTE(sigmavirus24): We can't use choices for this option since users can
# freely provide a format string and that will break if we restrict their
# choices.
add_option(
'--format', metavar='format', default='default',
parse_from_config=True,
help='Format errors according to the chosen formatter.',
)
add_option(
'--hang-closing', action='store_true', parse_from_config=True,
help='Hang closing bracket instead of matching indentation of opening'
" bracket's line.",
)
add_option(
'--ignore', metavar='errors', default=defaults.IGNORE,
parse_from_config=True, comma_separated_list=True,
help='Comma-separated list of errors and warnings to ignore (or skip).'
' For example, ``--ignore=E4,E51,W234``. (Default: %default)',
)
add_option(
'--max-line-length', type='int', metavar='n',
default=defaults.MAX_LINE_LENGTH, parse_from_config=True,
help='Maximum allowed line length for the entirety of this run. '
'(Default: %default)',
)
add_option(
'--select', metavar='errors', default=defaults.SELECT,
parse_from_config=True, comma_separated_list=True,
help='Comma-separated list of errors and warnings to enable.'
' For example, ``--select=E4,E51,W234``. (Default: %default)',
)
add_option(
'--disable-noqa', default=False, parse_from_config=True,
action='store_true',
help='Disable the effect of "# noqa". This will report errors on '
'lines with "# noqa" at the end.'
)
# TODO(sigmavirus24): Decide what to do about --show-pep8
add_option(
'--show-source', action='store_true', parse_from_config=True,
help='Show the source generate each error or warning.',
)
add_option(
'--statistics', action='store_true', parse_from_config=True,
help='Count errors and warnings.',
)
# Flake8 options
add_option(
'--enable-extensions', default='', parse_from_config=True,
comma_separated_list=True, type='string',
help='Enable plugins and extensions that are otherwise disabled '
'by default',
)
add_option(
'--exit-zero', action='store_true',
help='Exit with status code "0" even if there are errors.',
)
add_option(
'--install-hook', action='callback', type='choice',
choices=vcs.choices(), callback=vcs.install,
help='Install a hook that is run prior to a commit for the supported '
'version control systema.'
)
add_option(
'-j', '--jobs', type='string', default='auto', parse_from_config=True,
help='Number of subprocesses to use to run checks in parallel. '
'This is ignored on Windows. The default, "auto", will '
'auto-detect the number of processors available to use.'
' (Default: %default)',
)
add_option(
'--output-file', default=None, type='string', parse_from_config=True,
# callback=callbacks.redirect_stdout,
help='Redirect report to a file.',
)
# Config file options
add_option(
'--append-config', action='append',
help='Provide extra config files to parse in addition to the files '
'found by Flake8 by default. These files are the last ones read '
'and so they take the highest precedence when multiple files '
'provide the same option.',
)
add_option(
'--config', default=None,
help='Path to the config file that will be the authoritative config '
'source. This will cause Flake8 to ignore all other '
'configuration files.'
)
add_option(
'--isolated', default=False, action='store_true',
help='Ignore all found configuration files.',
)
# Benchmarking
add_option(
'--benchmark', default=False, action='store_true',
help='Print benchmark information about this run of Flake8',
)

View file

@ -0,0 +1,77 @@
"""The logic for Flake8's integration with setuptools."""
import os
import setuptools
from flake8.main import application as app
class Flake8(setuptools.Command):
"""Run Flake8 via setuptools/distutils for registered modules."""
description = 'Run Flake8 on modules registered in setup.py'
# NOTE(sigmavirus24): If we populated this with a list of tuples, users
# could do something like ``python setup.py flake8 --ignore=E123,E234``
# but we would have to redefine it and we can't define it dynamically.
# Since I refuse to copy-and-paste the options here or maintain two lists
# of options, and since this will break when users use plugins that
# provide command-line options, we are leaving this empty. If users want
# to configure this command, they can do so through config files.
user_options = []
def initialize_options(self):
"""Override this method to initialize our application."""
pass
def finalize_options(self):
"""Override this to parse the parameters."""
pass
def package_files(self):
"""Collect the files/dirs included in the registered modules."""
seen_package_directories = ()
directories = self.distribution.package_dir or {}
empty_directory_exists = '' in directories
packages = self.distribution.packages or []
for package in packages:
package_directory = package
if package in directories:
package_directory = directories[package]
elif empty_directory_exists:
package_directory = os.path.join(directories[''],
package_directory)
# NOTE(sigmavirus24): Do not collect submodules, e.g.,
# if we have:
# - flake8/
# - flake8/plugins/
# Flake8 only needs ``flake8/`` to be provided. It will
# recurse on its own.
if package_directory.startswith(seen_package_directories):
continue
seen_package_directories += (package_directory,)
yield package_directory
def module_files(self):
"""Collect the files listed as py_modules."""
modules = self.distribution.py_modules or []
filename_from = '{0}.py'.format
for module in modules:
yield filename_from(module)
def distribution_files(self):
"""Collect package and module files."""
for package in self.package_files():
yield package
for module in self.module_files():
yield module
yield 'setup.py'
def run(self):
"""Run the Flake8 application."""
flake8 = app.Application()
flake8.run(list(self.distribution_files()))
flake8.exit()

39
src/flake8/main/vcs.py Normal file
View file

@ -0,0 +1,39 @@
"""Module containing some of the logic for our VCS installation logic."""
from flake8 import exceptions as exc
from flake8.main import git
from flake8.main import mercurial
# NOTE(sigmavirus24): In the future, we may allow for VCS hooks to be defined
# as plugins, e.g., adding a flake8.vcs entry-point. In that case, this
# dictionary should disappear, and this module might contain more code for
# managing those bits (in conjuntion with flake8.plugins.manager).
_INSTALLERS = {
'git': git.install,
'mercurial': mercurial.install,
}
def install(option, option_string, value, parser):
"""Determine which version control hook to install.
For more information about the callback signature, see:
https://docs.python.org/2/library/optparse.html#optparse-option-callbacks
"""
installer = _INSTALLERS.get(value)
errored = False
successful = False
try:
successful = installer()
except exc.HookInstallationError as hook_error:
print(str(hook_error))
errored = True
if not successful:
print('Could not find the {0} directory'.format(value))
raise SystemExit(not successful and errored)
def choices():
"""Return the list of VCS choices."""
return list(_INSTALLERS.keys())

View file

@ -0,0 +1,12 @@
"""Package containing the option manager and config management logic.
- :mod:`flake8.options.config` contains the logic for finding, parsing, and
merging configuration files.
- :mod:`flake8.options.manager` contains the logic for managing customized
Flake8 command-line and configuration options.
- :mod:`flake8.options.aggregator` uses objects from both of the above modules
to aggregate configuration into one object used by plugins and Flake8.
"""

View file

@ -0,0 +1,74 @@
"""Aggregation function for CLI specified options and config file options.
This holds the logic that uses the collected and merged config files and
applies the user-specified command-line configuration on top of it.
"""
import logging
from flake8 import utils
from flake8.options import config
LOG = logging.getLogger(__name__)
def aggregate_options(manager, arglist=None, values=None):
"""Aggregate and merge CLI and config file options.
:param flake8.option.manager.OptionManager manager:
The instance of the OptionManager that we're presently using.
:param list arglist:
The list of arguments to pass to ``manager.parse_args``. In most cases
this will be None so ``parse_args`` uses ``sys.argv``. This is mostly
available to make testing easier.
:param optparse.Values values:
Previously parsed set of parsed options.
:returns:
Tuple of the parsed options and extra arguments returned by
``manager.parse_args``.
:rtype:
tuple(optparse.Values, list)
"""
# Get defaults from the option parser
default_values, _ = manager.parse_args([], values=values)
# Get original CLI values so we can find additional config file paths and
# see if --config was specified.
original_values, original_args = manager.parse_args(arglist)
extra_config_files = utils.normalize_paths(original_values.append_config)
# Make our new configuration file mergerator
config_parser = config.MergedConfigParser(
option_manager=manager,
extra_config_files=extra_config_files,
args=original_args,
)
# Get the parsed config
parsed_config = config_parser.parse(original_values.config,
original_values.isolated)
# Extend the default ignore value with the extended default ignore list,
# registered by plugins.
extended_default_ignore = manager.extended_default_ignore.copy()
LOG.debug('Extended default ignore list: %s',
list(extended_default_ignore))
extended_default_ignore.update(default_values.ignore)
default_values.ignore = list(extended_default_ignore)
LOG.debug('Merged default ignore list: %s', default_values.ignore)
# Merge values parsed from config onto the default values returned
for config_name, value in parsed_config.items():
dest_name = config_name
# If the config name is somehow different from the destination name,
# fetch the destination name from our Option
if not hasattr(default_values, config_name):
dest_name = config_parser.config_options[config_name].dest
LOG.debug('Overriding default value of (%s) for "%s" with (%s)',
getattr(default_values, dest_name, None),
dest_name,
value)
# Override the default values with the config values
setattr(default_values, dest_name, value)
# Finally parse the command-line options
return manager.parse_args(arglist, default_values)

View file

@ -0,0 +1,279 @@
"""Config handling logic for Flake8."""
import configparser
import logging
import os.path
import sys
LOG = logging.getLogger(__name__)
__all__ = ('ConfigFileFinder', 'MergedConfigParser')
class ConfigFileFinder(object):
"""Encapsulate the logic for finding and reading config files."""
PROJECT_FILENAMES = ('setup.cfg', 'tox.ini')
def __init__(self, program_name, args, extra_config_files):
"""Initialize object to find config files.
:param str program_name:
Name of the current program (e.g., flake8).
:param list args:
The extra arguments passed on the command-line.
:param list extra_config_files:
Extra configuration files specified by the user to read.
"""
# The values of --append-config from the CLI
extra_config_files = extra_config_files or []
self.extra_config_files = [
# Ensure the paths are absolute paths for local_config_files
os.path.abspath(f) for f in extra_config_files
]
# Platform specific settings
self.is_windows = sys.platform == 'win32'
self.xdg_home = os.environ.get('XDG_CONFIG_HOME',
os.path.expanduser('~/.config'))
# Look for '.<program_name>' files
self.program_config = '.' + program_name
self.program_name = program_name
# List of filenames to find in the local/project directory
self.project_filenames = ('setup.cfg', 'tox.ini', self.program_config)
self.local_directory = os.path.abspath(os.curdir)
if not args:
args = ['.']
self.parent = self.tail = os.path.abspath(os.path.commonprefix(args))
@staticmethod
def _read_config(files):
config = configparser.RawConfigParser()
try:
found_files = config.read(files)
except configparser.ParsingError:
LOG.exception("There was an error trying to parse a config "
"file. The files we were attempting to parse "
"were: %r", files)
found_files = []
return (config, found_files)
def cli_config(self, files):
"""Read and parse the config file specified on the command-line."""
config, found_files = self._read_config(files)
if found_files:
LOG.debug('Found cli configuration files: %s', found_files)
return config
def generate_possible_local_files(self):
"""Find and generate all local config files."""
tail = self.tail
parent = self.parent
local_dir = self.local_directory
while tail:
for project_filename in self.project_filenames:
filename = os.path.abspath(os.path.join(parent,
project_filename))
yield filename
if parent == local_dir:
break
(parent, tail) = os.path.split(parent)
def local_config_files(self):
"""Find all local config files which actually exist.
Filter results from
:meth:`~ConfigFileFinder.generate_possible_local_files` based
on whether the filename exists or not.
:returns:
List of files that exist that are local project config files with
extra config files appended to that list (which also exist).
:rtype:
[str]
"""
exists = os.path.exists
return [
filename
for filename in self.generate_possible_local_files()
if os.path.exists(filename)
] + [f for f in self.extra_config_files if exists(f)]
def local_configs(self):
"""Parse all local config files into one config object."""
config, found_files = self._read_config(self.local_config_files())
if found_files:
LOG.debug('Found local configuration files: %s', found_files)
return config
def user_config_file(self):
"""Find the user-level config file."""
if self.is_windows:
return os.path.expanduser('~\\' + self.program_config)
return os.path.join(self.xdg_home, self.program_name)
def user_config(self):
"""Parse the user config file into a config object."""
config, found_files = self._read_config(self.user_config_file())
if found_files:
LOG.debug('Found user configuration files: %s', found_files)
return config
class MergedConfigParser(object):
"""Encapsulate merging different types of configuration files.
This parses out the options registered that were specified in the
configuration files, handles extra configuration files, and returns
dictionaries with the parsed values.
"""
#: Set of types that should use the
#: :meth:`~configparser.RawConfigParser.getint` method.
GETINT_TYPES = set(['int', 'count'])
#: Set of actions that should use the
#: :meth:`~configparser.RawConfigParser.getbool` method.
GETBOOL_ACTIONS = set(['store_true', 'store_false'])
def __init__(self, option_manager, extra_config_files=None, args=None):
"""Initialize the MergedConfigParser instance.
:param flake8.option.manager.OptionManager option_manager:
Initialized OptionManager.
:param list extra_config_files:
List of extra config files to parse.
:params list args:
The extra parsed arguments from the command-line.
"""
#: Our instance of flake8.options.manager.OptionManager
self.option_manager = option_manager
#: The prog value for the cli parser
self.program_name = option_manager.program_name
#: Parsed extra arguments
self.args = args
#: Mapping of configuration option names to
#: :class:`~flake8.options.manager.Option` instances
self.config_options = option_manager.config_options_dict
#: List of extra config files
self.extra_config_files = extra_config_files or []
#: Our instance of our :class:`~ConfigFileFinder`
self.config_finder = ConfigFileFinder(self.program_name, self.args,
self.extra_config_files)
@staticmethod
def _normalize_value(option, value):
final_value = option.normalize(value)
LOG.debug('%r has been normalized to %r for option "%s"',
value, final_value, option.config_name)
return final_value
def _parse_config(self, config_parser):
config_dict = {}
for option_name in config_parser.options(self.program_name):
if option_name not in self.config_options:
LOG.debug('Option "%s" is not registered. Ignoring.',
option_name)
continue
option = self.config_options[option_name]
# Use the appropriate method to parse the config value
method = config_parser.get
if option.type in self.GETINT_TYPES:
method = config_parser.getint
elif option.action in self.GETBOOL_ACTIONS:
method = config_parser.getboolean
value = method(self.program_name, option_name)
LOG.debug('Option "%s" returned value: %r', option_name, value)
final_value = self._normalize_value(option, value)
config_dict[option_name] = final_value
return config_dict
def is_configured_by(self, config):
"""Check if the specified config parser has an appropriate section."""
return config.has_section(self.program_name)
def parse_local_config(self):
"""Parse and return the local configuration files."""
config = self.config_finder.local_configs()
if not self.is_configured_by(config):
LOG.debug('Local configuration files have no %s section',
self.program_name)
return {}
LOG.debug('Parsing local configuration files.')
return self._parse_config(config)
def parse_user_config(self):
"""Parse and return the user configuration files."""
config = self.config_finder.user_config()
if not self.is_configured_by(config):
LOG.debug('User configuration files have no %s section',
self.program_name)
return {}
LOG.debug('Parsing user configuration files.')
return self._parse_config(config)
def parse_cli_config(self, config_path):
"""Parse and return the file specified by --config."""
config = self.config_finder.cli_config(config_path)
if not self.is_configured_by(config):
LOG.debug('CLI configuration files have no %s section',
self.program_name)
return {}
LOG.debug('Parsing CLI configuration files.')
return self._parse_config(config)
def merge_user_and_local_config(self):
"""Merge the parsed user and local configuration files.
:returns:
Dictionary of the parsed and merged configuration options.
:rtype:
dict
"""
user_config = self.parse_user_config()
config = self.parse_local_config()
for option, value in user_config.items():
config.setdefault(option, value)
return config
def parse(self, cli_config=None, isolated=False):
"""Parse and return the local and user config files.
First this copies over the parsed local configuration and then
iterates over the options in the user configuration and sets them if
they were not set by the local configuration file.
:param str cli_config:
Value of --config when specified at the command-line. Overrides
all other config files.
:param bool isolated:
Determines if we should parse configuration files at all or not.
If running in isolated mode, we ignore all configuration files
:returns:
Dictionary of parsed configuration options
:rtype:
dict
"""
if isolated:
LOG.debug('Refusing to parse configuration files due to user-'
'requested isolation')
return {}
if cli_config:
LOG.debug('Ignoring user and locally found configuration files. '
'Reading only configuration from "%s" specified via '
'--config by the user', cli_config)
return self.parse_cli_config(cli_config)
return self.merge_user_and_local_config()

View file

@ -0,0 +1,256 @@
"""Option handling and Option management logic."""
import logging
import optparse # pylint: disable=deprecated-module
from flake8 import utils
LOG = logging.getLogger(__name__)
class Option(object):
"""Our wrapper around an optparse.Option object to add features."""
def __init__(self, short_option_name=None, long_option_name=None,
# Options below here are taken from the optparse.Option class
action=None, default=None, type=None, dest=None,
nargs=None, const=None, choices=None, callback=None,
callback_args=None, callback_kwargs=None, help=None,
metavar=None,
# Options below here are specific to Flake8
parse_from_config=False, comma_separated_list=False,
normalize_paths=False):
"""Initialize an Option instance wrapping optparse.Option.
The following are all passed directly through to optparse.
:param str short_option_name:
The short name of the option (e.g., ``-x``). This will be the
first argument passed to :class:`~optparse.Option`.
:param str long_option_name:
The long name of the option (e.g., ``--xtra-long-option``). This
will be the second argument passed to :class:`~optparse.Option`.
:param str action:
Any action allowed by :mod:`optparse`.
:param default:
Default value of the option.
:param type:
Any type allowed by :mod:`optparse`.
:param dest:
Attribute name to store parsed option value as.
:param nargs:
Number of arguments to parse for this option.
:param const:
Constant value to store on a common destination. Usually used in
conjuntion with ``action="store_const"``.
:param iterable choices:
Possible values for the option.
:param callable callback:
Callback used if the action is ``"callback"``.
:param iterable callback_args:
Additional positional arguments to the callback callable.
:param dictionary callback_kwargs:
Keyword arguments to the callback callable.
:param str help:
Help text displayed in the usage information.
:param str metavar:
Name to use instead of the long option name for help text.
The following parameters are for Flake8's option handling alone.
:param bool parse_from_config:
Whether or not this option should be parsed out of config files.
:param bool comma_separated_list:
Whether the option is a comma separated list when parsing from a
config file.
:param bool normalize_paths:
Whether the option is expecting a path or list of paths and should
attempt to normalize the paths to absolute paths.
"""
self.short_option_name = short_option_name
self.long_option_name = long_option_name
self.option_args = [
x for x in (short_option_name, long_option_name) if x is not None
]
self.option_kwargs = {
'action': action,
'default': default,
'type': type,
'dest': self._make_dest(dest),
'nargs': nargs,
'const': const,
'choices': choices,
'callback': callback,
'callback_args': callback_args,
'callback_kwargs': callback_kwargs,
'help': help,
'metavar': metavar,
}
# Set attributes for our option arguments
for key, value in self.option_kwargs.items():
setattr(self, key, value)
# Set our custom attributes
self.parse_from_config = parse_from_config
self.comma_separated_list = comma_separated_list
self.normalize_paths = normalize_paths
self.config_name = None
if parse_from_config:
if not long_option_name:
raise ValueError('When specifying parse_from_config=True, '
'a long_option_name must also be specified.')
self.config_name = long_option_name[2:].replace('-', '_')
self._opt = None
def __repr__(self):
"""Simple representation of an Option class."""
return (
'Option({0}, {1}, action={action}, default={default}, '
'dest={dest}, type={type}, callback={callback}, help={help},'
' callback={callback}, callback_args={callback_args}, '
'callback_kwargs={callback_kwargs}, metavar={metavar})'
).format(self.short_option_name, self.long_option_name,
**self.option_kwargs)
def _make_dest(self, dest):
if dest:
return dest
if self.long_option_name:
return self.long_option_name[2:].replace('-', '_')
return self.short_option_name[1]
def normalize(self, value):
"""Normalize the value based on the option configuration."""
if self.normalize_paths:
# Decide whether to parse a list of paths or a single path
normalize = utils.normalize_path
if self.comma_separated_list:
normalize = utils.normalize_paths
return normalize(value)
elif self.comma_separated_list:
return utils.parse_comma_separated_list(value)
return value
def to_optparse(self):
"""Convert a Flake8 Option to an optparse Option."""
if self._opt is None:
self._opt = optparse.Option(*self.option_args,
**self.option_kwargs)
return self._opt
class OptionManager(object):
"""Manage Options and OptionParser while adding post-processing."""
def __init__(self, prog=None, version=None,
usage='%prog [options] file file ...'):
"""Initialize an instance of an OptionManager.
:param str prog:
Name of the actual program (e.g., flake8).
:param str version:
Version string for the program.
:param str usage:
Basic usage string used by the OptionParser.
"""
self.parser = optparse.OptionParser(prog=prog, version=version,
usage=usage)
self.config_options_dict = {}
self.options = []
self.program_name = prog
self.version = version
self.registered_plugins = set()
self.extended_default_ignore = set()
@staticmethod
def format_plugin(plugin_tuple):
"""Convert a plugin tuple into a dictionary mapping name to value."""
return dict(zip(["name", "version"], plugin_tuple))
def add_option(self, *args, **kwargs):
"""Create and register a new option.
See parameters for :class:`~flake8.options.manager.Option` for
acceptable arguments to this method.
.. note::
``short_option_name`` and ``long_option_name`` may be specified
positionally as they are with optparse normally.
"""
if len(args) == 1 and args[0].startswith('--'):
args = (None, args[0])
option = Option(*args, **kwargs)
self.parser.add_option(option.to_optparse())
self.options.append(option)
if option.parse_from_config:
self.config_options_dict[option.config_name] = option
LOG.debug('Registered option "%s".', option)
def remove_from_default_ignore(self, error_codes):
"""Remove specified error codes from the default ignore list.
:param list error_codes:
List of strings that are the error/warning codes to attempt to
remove from the extended default ignore list.
"""
LOG.debug('Removing %r from the default ignore list', error_codes)
for error_code in error_codes:
try:
self.extend_default_ignore.remove(error_code)
except ValueError:
LOG.debug('Attempted to remove %s from default ignore'
' but it was not a member of the list.', error_code)
def extend_default_ignore(self, error_codes):
"""Extend the default ignore list with the error codes provided.
:param list error_codes:
List of strings that are the error/warning codes with which to
extend the default ignore list.
"""
LOG.debug('Extending default ignore list with %r', error_codes)
self.extended_default_ignore.update(error_codes)
def generate_versions(self, format_str='%(name)s: %(version)s'):
"""Generate a comma-separated list of versions of plugins."""
return ', '.join(
format_str % self.format_plugin(plugin)
for plugin in self.registered_plugins
)
def update_version_string(self):
"""Update the flake8 version string."""
self.parser.version = (self.version + ' (' +
self.generate_versions() + ')')
def generate_epilog(self):
"""Create an epilog with the version and name of each of plugin."""
plugin_version_format = '%(name)s: %(version)s'
self.parser.epilog = 'Installed plugins: ' + self.generate_versions(
plugin_version_format
)
def parse_args(self, args=None, values=None):
"""Simple proxy to calling the OptionParser's parse_args method."""
self.generate_epilog()
self.update_version_string()
options, xargs = self.parser.parse_args(args, values)
for option in self.options:
old_value = getattr(options, option.dest)
setattr(options, option.dest, option.normalize(old_value))
return options, xargs
def register_plugin(self, name, version):
"""Register a plugin relying on the OptionManager.
:param str name:
The name of the checker itself. This will be the ``name``
attribute of the class or function loaded from the entry-point.
:param str version:
The version of the checker that we're using.
"""
self.registered_plugins.add((name, version))

View file

@ -0,0 +1 @@
"""Submodule of built-in plugins and plugin managers."""

View file

@ -0,0 +1,97 @@
"""Independent implementation of a Trie tree."""
__all__ = ('Trie', 'TrieNode')
def _iterate_stringlike_objects(string):
for i in range(len(string)):
yield string[i:i + 1]
class Trie(object):
"""The object that manages the trie nodes."""
def __init__(self):
"""Initialize an empty trie."""
self.root = TrieNode(None, None)
def add(self, path, node_data):
"""Add the node data to the path described."""
node = self.root
for prefix in _iterate_stringlike_objects(path):
child = node.find_prefix(prefix)
if child is None:
child = node.add_child(prefix, [])
node = child
node.data.append(node_data)
def find(self, path):
"""Find a node based on the path provided."""
node = self.root
for prefix in _iterate_stringlike_objects(path):
child = node.find_prefix(prefix)
if child is None:
return None
node = child
return node
def traverse(self):
"""Traverse this tree.
This performs a depth-first pre-order traversal of children in this
tree. It returns the results consistently by first sorting the
children based on their prefix and then traversing them in
alphabetical order.
"""
return self.root.traverse()
class TrieNode(object):
"""The majority of the implementation details of a Trie."""
def __init__(self, prefix, data, children=None):
"""Initialize a TrieNode with data and children."""
self.children = children or {}
self.data = data
self.prefix = prefix
def __repr__(self):
"""Generate an easy to read representation of the node."""
return 'TrieNode(prefix={0}, data={1})'.format(
self.prefix, self.data
)
def find_prefix(self, prefix):
"""Find the prefix in the children of this node.
:returns: A child matching the prefix or None.
:rtype: :class:`~TrieNode` or None
"""
return self.children.get(prefix, None)
def add_child(self, prefix, data, children=None):
"""Create and add a new child node.
:returns: The newly created node
:rtype: :class:`~TrieNode`
"""
new_node = TrieNode(prefix, data, children)
self.children[prefix] = new_node
return new_node
def traverse(self):
"""Traverse children of this node.
This performs a depth-first pre-order traversal of the remaining
children in this sub-tree. It returns the results consistently by
first sorting the children based on their prefix and then traversing
them in alphabetical order.
"""
if not self.children:
return
for prefix in sorted(self.children.keys()):
child = self.children[prefix]
yield child
for child in child.traverse():
yield child

View file

@ -0,0 +1,458 @@
"""Plugin loading and management logic and classes."""
import collections
import logging
import pkg_resources
from flake8 import exceptions
from flake8 import utils
from flake8.plugins import notifier
LOG = logging.getLogger(__name__)
__all__ = (
'Checkers',
'Listeners',
'Plugin',
'PluginManager',
'ReportFormatters',
)
NO_GROUP_FOUND = object()
class Plugin(object):
"""Wrap an EntryPoint from setuptools and other logic."""
def __init__(self, name, entry_point):
""""Initialize our Plugin.
:param str name:
Name of the entry-point as it was registered with setuptools.
:param entry_point:
EntryPoint returned by setuptools.
:type entry_point:
setuptools.EntryPoint
"""
self.name = name
self.entry_point = entry_point
self._plugin = None
self._parameters = None
self._group = None
self._plugin_name = None
self._version = None
def __repr__(self):
"""Provide an easy to read description of the current plugin."""
return 'Plugin(name="{0}", entry_point="{1}")'.format(
self.name, self.entry_point
)
def is_in_a_group(self):
"""Determine if this plugin is in a group.
:returns:
True if the plugin is in a group, otherwise False.
:rtype:
bool
"""
return self.group() is not None
def group(self):
"""Find and parse the group the plugin is in."""
if self._group is None:
name = self.name.split('.', 1)
if len(name) > 1:
self._group = name[0]
else:
self._group = NO_GROUP_FOUND
if self._group is NO_GROUP_FOUND:
return None
return self._group
@property
def parameters(self):
"""List of arguments that need to be passed to the plugin."""
if self._parameters is None:
self._parameters = utils.parameters_for(self)
return self._parameters
@property
def plugin(self):
"""The loaded (and cached) plugin associated with the entry-point.
This property implicitly loads the plugin and then caches it.
"""
self.load_plugin()
return self._plugin
@property
def version(self):
"""Return the version of the plugin."""
if self._version is None:
if self.is_in_a_group():
self._version = version_for(self)
else:
self._version = self.plugin.version
return self._version
@property
def plugin_name(self):
"""Return the name of the plugin."""
if self._plugin_name is None:
if self.is_in_a_group():
self._plugin_name = self.group()
else:
self._plugin_name = self.plugin.name
return self._plugin_name
@property
def off_by_default(self):
"""Return whether the plugin is ignored by default."""
return getattr(self.plugin, 'off_by_default', False)
def execute(self, *args, **kwargs):
r"""Call the plugin with \*args and \*\*kwargs."""
return self.plugin(*args, **kwargs) # pylint: disable=not-callable
def _load(self, verify_requirements):
# Avoid relying on hasattr() here.
resolve = getattr(self.entry_point, 'resolve', None)
require = getattr(self.entry_point, 'require', None)
if resolve and require:
if verify_requirements:
LOG.debug('Verifying plugin "%s"\'s requirements.',
self.name)
require()
self._plugin = resolve()
else:
self._plugin = self.entry_point.load(
require=verify_requirements
)
def load_plugin(self, verify_requirements=False):
"""Retrieve the plugin for this entry-point.
This loads the plugin, stores it on the instance and then returns it.
It does not reload it after the first time, it merely returns the
cached plugin.
:param bool verify_requirements:
Whether or not to make setuptools verify that the requirements for
the plugin are satisfied.
:returns:
Nothing
"""
if self._plugin is None:
LOG.info('Loading plugin "%s" from entry-point.', self.name)
try:
self._load(verify_requirements)
except Exception as load_exception:
LOG.exception(load_exception, exc_info=True)
failed_to_load = exceptions.FailedToLoadPlugin(
plugin=self,
exception=load_exception,
)
LOG.critical(str(failed_to_load))
raise failed_to_load
def enable(self, optmanager):
"""Remove plugin name from the default ignore list."""
optmanager.remove_from_default_ignore([self.name])
def disable(self, optmanager):
"""Add the plugin name to the default ignore list."""
optmanager.extend_default_ignore([self.name])
def provide_options(self, optmanager, options, extra_args):
"""Pass the parsed options and extra arguments to the plugin."""
parse_options = getattr(self.plugin, 'parse_options', None)
if parse_options is not None:
LOG.debug('Providing options to plugin "%s".', self.name)
try:
parse_options(optmanager, options, extra_args)
except TypeError:
parse_options(options)
if self.name in options.enable_extensions:
self.enable(optmanager)
def register_options(self, optmanager):
"""Register the plugin's command-line options on the OptionManager.
:param optmanager:
Instantiated OptionManager to register options on.
:type optmanager:
flake8.options.manager.OptionManager
:returns:
Nothing
"""
add_options = getattr(self.plugin, 'add_options', None)
if add_options is not None:
LOG.debug(
'Registering options from plugin "%s" on OptionManager %r',
self.name, optmanager
)
add_options(optmanager)
if self.off_by_default:
self.disable(optmanager)
class PluginManager(object): # pylint: disable=too-few-public-methods
"""Find and manage plugins consistently."""
def __init__(self, namespace, verify_requirements=False):
"""Initialize the manager.
:param str namespace:
Namespace of the plugins to manage, e.g., 'flake8.extension'.
:param bool verify_requirements:
Whether or not to make setuptools verify that the requirements for
the plugin are satisfied.
"""
self.namespace = namespace
self.verify_requirements = verify_requirements
self.plugins = {}
self.names = []
self._load_all_plugins()
def _load_all_plugins(self):
LOG.info('Loading entry-points for "%s".', self.namespace)
for entry_point in pkg_resources.iter_entry_points(self.namespace):
name = entry_point.name
self.plugins[name] = Plugin(name, entry_point)
self.names.append(name)
LOG.debug('Loaded %r for plugin "%s".', self.plugins[name], name)
def map(self, func, *args, **kwargs):
r"""Call ``func`` with the plugin and \*args and \**kwargs after.
This yields the return value from ``func`` for each plugin.
:param collections.Callable func:
Function to call with each plugin. Signature should at least be:
.. code-block:: python
def myfunc(plugin):
pass
Any extra positional or keyword arguments specified with map will
be passed along to this function after the plugin. The plugin
passed is a :class:`~flake8.plugins.manager.Plugin`.
:param args:
Positional arguments to pass to ``func`` after each plugin.
:param kwargs:
Keyword arguments to pass to ``func`` after each plugin.
"""
for name in self.names:
yield func(self.plugins[name], *args, **kwargs)
def versions(self):
# () -> (str, str)
"""Generate the versions of plugins.
:returns:
Tuples of the plugin_name and version
:rtype:
tuple
"""
plugins_seen = set()
for entry_point_name in self.names:
plugin = self.plugins[entry_point_name]
plugin_name = plugin.plugin_name
if plugin.plugin_name in plugins_seen:
continue
plugins_seen.add(plugin_name)
yield (plugin_name, plugin.version)
def version_for(plugin):
# (Plugin) -> Union[str, NoneType]
"""Determine the version of a plugin by it's module.
:param plugin:
The loaded plugin
:type plugin:
Plugin
:returns:
version string for the module
:rtype:
str
"""
module_name = plugin.plugin.__module__
try:
module = __import__(module_name)
except ImportError:
return None
return getattr(module, '__version__', None)
class PluginTypeManager(object):
"""Parent class for most of the specific plugin types."""
namespace = None
def __init__(self):
"""Initialize the plugin type's manager."""
self.manager = PluginManager(self.namespace)
self.plugins_loaded = False
def __contains__(self, name):
"""Check if the entry-point name is in this plugin type manager."""
LOG.debug('Checking for "%s" in plugin type manager.', name)
return name in self.plugins
def __getitem__(self, name):
"""Retrieve a plugin by its name."""
LOG.debug('Retrieving plugin for "%s".', name)
return self.plugins[name]
def get(self, name, default=None):
"""Retrieve the plugin referred to by ``name`` or return the default.
:param str name:
Name of the plugin to retrieve.
:param default:
Default value to return.
:returns:
Plugin object referred to by name, if it exists.
:rtype:
:class:`Plugin`
"""
if name in self:
return self[name]
return default
@property
def names(self):
"""Proxy attribute to underlying manager."""
return self.manager.names
@property
def plugins(self):
"""Proxy attribute to underlying manager."""
return self.manager.plugins
@staticmethod
def _generate_call_function(method_name, optmanager, *args, **kwargs):
def generated_function(plugin):
"""Function that attempts to call a specific method on a plugin."""
method = getattr(plugin, method_name, None)
if (method is not None and
isinstance(method, collections.Callable)):
return method(optmanager, *args, **kwargs)
return generated_function
def load_plugins(self):
"""Load all plugins of this type that are managed by this manager."""
if self.plugins_loaded:
return
def load_plugin(plugin):
"""Call each plugin's load_plugin method."""
return plugin.load_plugin()
plugins = list(self.manager.map(load_plugin))
# Do not set plugins_loaded if we run into an exception
self.plugins_loaded = True
return plugins
def register_plugin_versions(self, optmanager):
"""Register the plugins and their versions with the OptionManager."""
self.load_plugins()
for (plugin_name, version) in self.manager.versions():
optmanager.register_plugin(name=plugin_name, version=version)
def register_options(self, optmanager):
"""Register all of the checkers' options to the OptionManager."""
self.load_plugins()
call_register_options = self._generate_call_function(
'register_options', optmanager,
)
list(self.manager.map(call_register_options))
def provide_options(self, optmanager, options, extra_args):
"""Provide parsed options and extra arguments to the plugins."""
call_provide_options = self._generate_call_function(
'provide_options', optmanager, options, extra_args,
)
list(self.manager.map(call_provide_options))
class NotifierBuilderMixin(object): # pylint: disable=too-few-public-methods
"""Mixin class that builds a Notifier from a PluginManager."""
def build_notifier(self):
"""Build a Notifier for our Listeners.
:returns:
Object to notify our listeners of certain error codes and
warnings.
:rtype:
:class:`~flake8.notifier.Notifier`
"""
notifier_trie = notifier.Notifier()
for name in self.names:
notifier_trie.register_listener(name, self.manager[name])
return notifier_trie
class Checkers(PluginTypeManager):
"""All of the checkers registered through entry-ponits."""
namespace = 'flake8.extension'
def checks_expecting(self, argument_name):
"""Retrieve checks that expect an argument with the specified name.
Find all checker plugins that are expecting a specific argument.
"""
for plugin in self.plugins.values():
if argument_name == plugin.parameters[0]:
yield plugin
@property
def ast_plugins(self):
"""List of plugins that expect the AST tree."""
plugins = getattr(self, '_ast_plugins', [])
if not plugins:
plugins = list(self.checks_expecting('tree'))
self._ast_plugins = plugins
return plugins
@property
def logical_line_plugins(self):
"""List of plugins that expect the logical lines."""
plugins = getattr(self, '_logical_line_plugins', [])
if not plugins:
plugins = list(self.checks_expecting('logical_line'))
self._logical_line_plugins = plugins
return plugins
@property
def physical_line_plugins(self):
"""List of plugins that expect the physical lines."""
plugins = getattr(self, '_physical_line_plugins', [])
if not plugins:
plugins = list(self.checks_expecting('physical_line'))
self._physical_line_plugins = plugins
return plugins
class Listeners(PluginTypeManager, NotifierBuilderMixin):
"""All of the listeners registered through entry-points."""
namespace = 'flake8.listen'
class ReportFormatters(PluginTypeManager):
"""All of the report formatters registered through entry-points."""
namespace = 'flake8.report'

View file

@ -0,0 +1,46 @@
"""Implementation of the class that registers and notifies listeners."""
from flake8.plugins import _trie
class Notifier(object):
"""Object that tracks and notifies listener objects."""
def __init__(self):
"""Initialize an empty notifier object."""
self.listeners = _trie.Trie()
def listeners_for(self, error_code):
"""Retrieve listeners for an error_code.
There may be listeners registered for E1, E100, E101, E110, E112, and
E126. To get all the listeners for one of E100, E101, E110, E112, or
E126 you would also need to incorporate the listeners for E1 (since
they're all in the same class).
Example usage:
.. code-block:: python
from flake8 import notifier
n = notifier.Notifier()
# register listeners
for listener in n.listeners_for('W102'):
listener.notify(...)
"""
path = error_code
while path:
node = self.listeners.find(path)
listeners = getattr(node, 'data', [])
for listener in listeners:
yield listener
path = path[:-1]
def notify(self, error_code, *args, **kwargs):
"""Notify all listeners for the specified error code."""
for listener in self.listeners_for(error_code):
listener.notify(error_code, *args, **kwargs)
def register_listener(self, error_code, listener):
"""Register a listener for a specific error_code."""
self.listeners.add(error_code, listener)

View file

@ -0,0 +1,141 @@
"""Plugin built-in to Flake8 to treat pyflakes as a plugin."""
# -*- coding: utf-8 -*-
from __future__ import absolute_import
try:
# The 'demandimport' breaks pyflakes and flake8.plugins.pyflakes
from mercurial import demandimport
except ImportError:
pass
else:
demandimport.disable()
import os
import pyflakes
import pyflakes.checker
from flake8 import utils
def patch_pyflakes():
"""Add error codes to Pyflakes messages."""
codes = dict([line.split()[::-1] for line in (
'F401 UnusedImport',
'F402 ImportShadowedByLoopVar',
'F403 ImportStarUsed',
'F404 LateFutureImport',
'F405 ImportStarUsage',
'F810 Redefined',
'F811 RedefinedWhileUnused',
'F812 RedefinedInListComp',
'F821 UndefinedName',
'F822 UndefinedExport',
'F823 UndefinedLocal',
'F831 DuplicateArgument',
'F841 UnusedVariable',
)])
for name, obj in vars(pyflakes.messages).items():
if name[0].isupper() and obj.message:
obj.flake8_msg = '%s %s' % (codes.get(name, 'F999'), obj.message)
patch_pyflakes()
class FlakesChecker(pyflakes.checker.Checker):
"""Subclass the Pyflakes checker to conform with the flake8 API."""
name = 'pyflakes'
version = pyflakes.__version__
def __init__(self, tree, filename):
"""Initialize the PyFlakes plugin with an AST tree and filename."""
filename = utils.normalize_paths(filename)[0]
with_doctest = self.with_doctest
included_by = [include for include in self.include_in_doctest
if include != '' and filename.startswith(include)]
if included_by:
with_doctest = True
for exclude in self.exclude_from_doctest:
if exclude != '' and filename.startswith(exclude):
with_doctest = False
overlaped_by = [include for include in included_by
if include.startswith(exclude)]
if overlaped_by:
with_doctest = True
super(FlakesChecker, self).__init__(tree, filename,
withDoctest=with_doctest)
@classmethod
def add_options(cls, parser):
"""Register options for PyFlakes on the Flake8 OptionManager."""
parser.add_option(
'--builtins', parse_from_config=True, comma_separated_list=True,
help="define more built-ins, comma separated",
)
parser.add_option(
'--doctests', default=False, action='store_true',
parse_from_config=True,
help="check syntax of the doctests",
)
parser.add_option(
'--include-in-doctest', default='',
dest='include_in_doctest', parse_from_config=True,
comma_separated_list=True, normalize_paths=True,
help='Run doctests only on these files',
type='string',
)
parser.add_option(
'--exclude-from-doctest', default='',
dest='exclude_from_doctest', parse_from_config=True,
comma_separated_list=True, normalize_paths=True,
help='Skip these files when running doctests',
type='string',
)
@classmethod
def parse_options(cls, options):
"""Parse option values from Flake8's OptionManager."""
if options.builtins:
cls.builtIns = cls.builtIns.union(options.builtins)
cls.with_doctest = options.doctests
included_files = []
for included_file in options.include_in_doctest:
if included_file == '':
continue
if not included_file.startswith((os.sep, './', '~/')):
included_files.append('./' + included_file)
else:
included_files.append(included_file)
cls.include_in_doctest = utils.normalize_paths(included_files)
excluded_files = []
for excluded_file in options.exclude_from_doctest:
if excluded_file == '':
continue
if not excluded_file.startswith((os.sep, './', '~/')):
excluded_files.append('./' + excluded_file)
else:
excluded_files.append(excluded_file)
cls.exclude_from_doctest = utils.normalize_paths(excluded_files)
inc_exc = set(cls.include_in_doctest).intersection(
cls.exclude_from_doctest
)
if inc_exc:
raise ValueError('"%s" was specified in both the '
'include-in-doctest and exclude-from-doctest '
'options. You are not allowed to specify it in '
'both for doctesting.' % inc_exc)
def run(self):
"""Run the plugin."""
for message in self.messages:
col = getattr(message, 'col', 0)
yield (message.lineno,
col,
(message.flake8_msg % message.message_args),
message.__class__)

430
src/flake8/processor.py Normal file
View file

@ -0,0 +1,430 @@
"""Module containing our file processor that tokenizes a file for checks."""
import contextlib
import io
import logging
import re
import sys
import tokenize
import flake8
from flake8 import defaults
from flake8 import exceptions
from flake8 import utils
LOG = logging.getLogger(__name__)
PyCF_ONLY_AST = 1024
NEWLINE = frozenset([tokenize.NL, tokenize.NEWLINE])
# Work around Python < 2.6 behaviour, which does not generate NL after
# a comment which is on a line by itself.
COMMENT_WITH_NL = tokenize.generate_tokens(['#\n'].pop).send(None)[1] == '#\n'
SKIP_TOKENS = frozenset([tokenize.NL, tokenize.NEWLINE, tokenize.INDENT,
tokenize.DEDENT])
class FileProcessor(object):
"""Processes a file and holdes state.
This processes a file by generating tokens, logical and physical lines,
and AST trees. This also provides a way of passing state about the file
to checks expecting that state. Any public attribute on this object can
be requested by a plugin. The known public attributes are:
- :attr:`blank_before`
- :attr:`blank_lines`
- :attr:`checker_state`
- :attr:`indect_char`
- :attr:`indent_level`
- :attr:`line_number`
- :attr:`logical_line`
- :attr:`max_line_length`
- :attr:`multiline`
- :attr:`noqa`
- :attr:`previous_indent_level`
- :attr:`previous_logical`
- :attr:`tokens`
- :attr:`total_lines`
- :attr:`verbose`
"""
NOQA_FILE = re.compile(r'\s*# flake8[:=]\s*noqa', re.I)
def __init__(self, filename, options, lines=None):
"""Initialice our file processor.
:param str filename:
Name of the file to process
"""
self.filename = filename
self.lines = lines
if lines is None:
self.lines = self.read_lines()
self.strip_utf_bom()
self.options = options
# Defaults for public attributes
#: Number of preceding blank lines
self.blank_before = 0
#: Number of blank lines
self.blank_lines = 0
#: Checker states for each plugin?
self._checker_states = {}
#: Current checker state
self.checker_state = None
#: User provided option for hang closing
self.hang_closing = options.hang_closing
#: Character used for indentation
self.indent_char = None
#: Current level of indentation
self.indent_level = 0
#: Line number in the file
self.line_number = 0
#: Current logical line
self.logical_line = ''
#: Maximum line length as configured by the user
self.max_line_length = options.max_line_length
#: Whether the current physical line is multiline
self.multiline = False
#: Whether or not we're observing NoQA
self.noqa = False
#: Previous level of indentation
self.previous_indent_level = 0
#: Previous logical line
self.previous_logical = ''
#: Current set of tokens
self.tokens = []
#: Total number of lines in the file
self.total_lines = len(self.lines)
#: Verbosity level of Flake8
self.verbose = options.verbose
#: Statistics dictionary
self.statistics = {
'logical lines': 0,
}
@contextlib.contextmanager
def inside_multiline(self, line_number):
"""Context-manager to toggle the multiline attribute."""
self.line_number = line_number
self.multiline = True
yield
self.multiline = False
def reset_blank_before(self):
"""Reset the blank_before attribute to zero."""
self.blank_before = 0
def delete_first_token(self):
"""Delete the first token in the list of tokens."""
del self.tokens[0]
def visited_new_blank_line(self):
"""Note that we visited a new blank line."""
self.blank_lines += 1
def update_state(self, mapping):
"""Update the indent level based on the logical line mapping."""
(start_row, start_col) = mapping[0][1]
start_line = self.lines[start_row - 1]
self.indent_level = expand_indent(start_line[:start_col])
if self.blank_before < self.blank_lines:
self.blank_before = self.blank_lines
def update_checker_state_for(self, plugin):
"""Update the checker_state attribute for the plugin."""
if 'checker_state' in plugin.parameters:
self.checker_state = self._checker_states.setdefault(
plugin.name, {}
)
def next_logical_line(self):
"""Record the previous logical line.
This also resets the tokens list and the blank_lines count.
"""
if self.logical_line:
self.previous_indent_level = self.indent_level
self.previous_logical = self.logical_line
self.blank_lines = 0
self.tokens = []
def build_logical_line_tokens(self):
"""Build the mapping, comments, and logical line lists."""
logical = []
comments = []
length = 0
previous_row = previous_column = mapping = None
for token_type, text, start, end, line in self.tokens:
if token_type in SKIP_TOKENS:
continue
if not mapping:
mapping = [(0, start)]
if token_type == tokenize.COMMENT:
comments.append(text)
continue
if token_type == tokenize.STRING:
text = mutate_string(text)
if previous_row:
(start_row, start_column) = start
if previous_row != start_row:
row_index = previous_row - 1
column_index = previous_column - 1
previous_text = self.lines[row_index][column_index]
if (previous_text == ',' or
(previous_text not in '{[(' and
text not in '}])')):
text = ' ' + text
elif previous_column != start_column:
text = line[previous_column:start_column] + text
logical.append(text)
length += len(text)
mapping.append((length, end))
(previous_row, previous_column) = end
return comments, logical, mapping
def build_ast(self):
"""Build an abstract syntax tree from the list of lines."""
return compile(''.join(self.lines), '', 'exec', PyCF_ONLY_AST)
def build_logical_line(self):
"""Build a logical line from the current tokens list."""
comments, logical, mapping_list = self.build_logical_line_tokens()
self.logical_line = ''.join(logical)
self.statistics['logical lines'] += 1
return ''.join(comments), self.logical_line, mapping_list
def split_line(self, token):
"""Split a physical line's line based on new-lines.
This also auto-increments the line number for the caller.
"""
for line in token[1].split('\n')[:-1]:
yield line
self.line_number += 1
def keyword_arguments_for(self, parameters, arguments=None):
"""Generate the keyword arguments for a list of parameters."""
if arguments is None:
arguments = {}
for param in parameters:
if param in arguments:
continue
try:
arguments[param] = getattr(self, param)
except AttributeError as exc:
LOG.exception(exc)
raise
return arguments
def check_physical_error(self, error_code, line):
"""Update attributes based on error code and line."""
if error_code == 'E101':
self.indent_char = line[0]
def generate_tokens(self):
"""Tokenize the file and yield the tokens.
:raises flake8.exceptions.InvalidSyntax:
If a :class:`tokenize.TokenError` is raised while generating
tokens.
"""
try:
for token in tokenize.generate_tokens(self.next_line):
if token[2][0] > self.total_lines:
break
self.tokens.append(token)
yield token
# NOTE(sigmavirus24): pycodestyle was catching both a SyntaxError
# and a tokenize.TokenError. In looking a the source on Python 2 and
# Python 3, the SyntaxError should never arise from generate_tokens.
# If we were using tokenize.tokenize, we would have to catch that. Of
# course, I'm going to be unsurprised to be proven wrong at a later
# date.
except tokenize.TokenError as exc:
raise exceptions.InvalidSyntax(exc.message, exception=exc)
def line_for(self, line_number):
"""Retrieve the physical line at the specified line number."""
return self.lines[line_number - 1]
def next_line(self):
"""Get the next line from the list."""
if self.line_number >= self.total_lines:
return ''
line = self.lines[self.line_number]
self.line_number += 1
if self.indent_char is None and line[:1] in defaults.WHITESPACE:
self.indent_char = line[0]
return line
def read_lines(self):
# type: () -> List[str]
"""Read the lines for this file checker."""
if self.filename is None or self.filename == '-':
self.filename = 'stdin'
return self.read_lines_from_stdin()
return self.read_lines_from_filename()
def _readlines_py2(self):
# type: () -> List[str]
with open(self.filename, 'rU') as fd:
return fd.readlines()
def _readlines_py3(self):
# type: () -> List[str]
try:
with open(self.filename, 'rb') as fd:
(coding, lines) = tokenize.detect_encoding(fd.readline)
textfd = io.TextIOWrapper(fd, coding, line_buffering=True)
return ([l.decode(coding) for l in lines] +
textfd.readlines())
except (LookupError, SyntaxError, UnicodeError):
# If we can't detect the codec with tokenize.detect_encoding, or
# the detected encoding is incorrect, just fallback to latin-1.
with open(self.filename, encoding='latin-1') as fd:
return fd.readlines()
def read_lines_from_filename(self):
# type: () -> List[str]
"""Read the lines for a file."""
if (2, 6) <= sys.version_info < (3, 0):
readlines = self._readlines_py2
elif (3, 0) <= sys.version_info < (4, 0):
readlines = self._readlines_py3
return readlines()
def read_lines_from_stdin(self):
# type: () -> List[str]
"""Read the lines from standard in."""
return utils.stdin_get_value().splitlines(True)
def should_ignore_file(self):
# type: () -> bool
"""Check if ``# flake8: noqa`` is in the file to be ignored.
:returns:
True if a line matches :attr:`FileProcessor.NOQA_FILE`,
otherwise False
:rtype:
bool
"""
ignore_file = self.NOQA_FILE.search
return any(ignore_file(line) for line in self.lines)
def strip_utf_bom(self):
# type: () -> NoneType
"""Strip the UTF bom from the lines of the file."""
if not self.lines:
# If we have nothing to analyze quit early
return
first_byte = ord(self.lines[0][0])
if first_byte not in (0xEF, 0xFEFF):
return
# If the first byte of the file is a UTF-8 BOM, strip it
if first_byte == 0xFEFF:
self.lines[0] = self.lines[0][1:]
elif self.lines[0][:3] == '\xEF\xBB\xBF':
self.lines[0] = self.lines[0][3:]
def is_eol_token(token):
"""Check if the token is an end-of-line token."""
return token[0] in NEWLINE or token[4][token[3][1]:].lstrip() == '\\\n'
if COMMENT_WITH_NL: # If on Python 2.6
def is_eol_token(token, _is_eol_token=is_eol_token):
"""Check if the token is an end-of-line token."""
return (_is_eol_token(token) or
(token[0] == tokenize.COMMENT and token[1] == token[4]))
def is_multiline_string(token):
"""Check if this is a multiline string."""
return token[0] == tokenize.STRING and '\n' in token[1]
def token_is_newline(token):
"""Check if the token type is a newline token type."""
return token[0] in NEWLINE
def token_is_comment(token):
"""Check if the token type is a comment."""
return COMMENT_WITH_NL and token[0] == tokenize.COMMENT
def count_parentheses(current_parentheses_count, token_text):
"""Count the number of parentheses."""
current_parentheses_count = current_parentheses_count or 0
if token_text in '([{':
return current_parentheses_count + 1
elif token_text in '}])':
return current_parentheses_count - 1
return current_parentheses_count
def log_token(log, token):
"""Log a token to a provided logging object."""
if token[2][0] == token[3][0]:
pos = '[%s:%s]' % (token[2][1] or '', token[3][1])
else:
pos = 'l.%s' % token[3][0]
log.log(flake8._EXTRA_VERBOSE, 'l.%s\t%s\t%s\t%r' %
(token[2][0], pos, tokenize.tok_name[token[0]],
token[1]))
# NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle
def expand_indent(line):
r"""Return the amount of indentation.
Tabs are expanded to the next multiple of 8.
>>> expand_indent(' ')
4
>>> expand_indent('\t')
8
>>> expand_indent(' \t')
8
>>> expand_indent(' \t')
16
"""
if '\t' not in line:
return len(line) - len(line.lstrip())
result = 0
for char in line:
if char == '\t':
result = result // 8 * 8 + 8
elif char == ' ':
result += 1
else:
break
return result
# NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be
# more descriptive.
def mutate_string(text):
"""Replace contents with 'xxx' to prevent syntax matching.
>>> mute_string('"abc"')
'"xxx"'
>>> mute_string("'''abc'''")
"'''xxx'''"
>>> mute_string("r'abc'")
"r'xxx'"
"""
# NOTE(sigmavirus24): If there are string modifiers (e.g., b, u, r)
# use the last "character" to determine if we're using single or double
# quotes and then find the first instance of it
start = text.index(text[-1]) + 1
end = len(text) - 1
# Check for triple-quoted strings
if text[-3:] in ('"""', "'''"):
start += 2
end -= 2
return text[:start] + 'x' * (end - start) + text[end:]

283
src/flake8/style_guide.py Normal file
View file

@ -0,0 +1,283 @@
"""Implementation of the StyleGuide used by Flake8."""
import collections
import enum
import linecache
import logging
import re
from flake8 import utils
__all__ = (
'StyleGuide',
)
LOG = logging.getLogger(__name__)
# TODO(sigmavirus24): Determine if we need to use enum/enum34
class Selected(enum.Enum):
"""Enum representing an explicitly or implicitly selected code."""
Explicitly = 'explicitly selected'
Implicitly = 'implicitly selected'
class Ignored(enum.Enum):
"""Enum representing an explicitly or implicitly ignored code."""
Explicitly = 'explicitly ignored'
Implicitly = 'implicitly ignored'
class Decision(enum.Enum):
"""Enum representing whether a code should be ignored or selected."""
Ignored = 'ignored error'
Selected = 'selected error'
Error = collections.namedtuple(
'Error',
[
'code',
'filename',
'line_number',
'column_number',
'text',
'physical_line',
],
)
class StyleGuide(object):
"""Manage a Flake8 user's style guide."""
NOQA_INLINE_REGEXP = re.compile(
# We're looking for items that look like this:
# ``# noqa``
# ``# noqa: E123``
# ``# noqa: E123,W451,F921``
# ``# NoQA: E123,W451,F921``
# ``# NOQA: E123,W451,F921``
# We do not care about the ``: `` that follows ``noqa``
# We do not care about the casing of ``noqa``
# We want a comma-separated list of errors
'# noqa(?:: )?(?P<codes>[A-Z0-9,]+)?$',
re.IGNORECASE
)
def __init__(self, options, listener_trie, formatter):
"""Initialize our StyleGuide.
.. todo:: Add parameter documentation.
"""
self.options = options
self.listener = listener_trie
self.formatter = formatter
self._selected = tuple(options.select)
self._ignored = tuple(options.ignore)
self._decision_cache = {}
self._parsed_diff = {}
def is_user_selected(self, code):
# type: (str) -> Union[Selected, Ignored]
"""Determine if the code has been selected by the user.
:param str code:
The code for the check that has been run.
:returns:
Selected.Implicitly if the selected list is empty,
Selected.Explicitly if the selected list is not empty and a match
was found,
Ignored.Implicitly if the selected list is not empty but no match
was found.
"""
if not self._selected:
return Selected.Implicitly
if code.startswith(self._selected):
return Selected.Explicitly
return Ignored.Implicitly
def is_user_ignored(self, code):
# type: (str) -> Union[Selected, Ignored]
"""Determine if the code has been ignored by the user.
:param str code:
The code for the check that has been run.
:returns:
Selected.Implicitly if the ignored list is empty,
Ignored.Explicitly if the ignored list is not empty and a match was
found,
Selected.Implicitly if the ignored list is not empty but no match
was found.
"""
if self._ignored and code.startswith(self._ignored):
return Ignored.Explicitly
return Selected.Implicitly
def _decision_for(self, code):
# type: (Error) -> Decision
startswith = code.startswith
selected = sorted([s for s in self._selected if startswith(s)])[0]
ignored = sorted([i for i in self._ignored if startswith(i)])[0]
if selected.startswith(ignored):
return Decision.Selected
return Decision.Ignored
def should_report_error(self, code):
# type: (str) -> Decision
"""Determine if the error code should be reported or ignored.
This method only cares about the select and ignore rules as specified
by the user in their configuration files and command-line flags.
This method does not look at whether the specific line is being
ignored in the file itself.
:param str code:
The code for the check that has been run.
"""
decision = self._decision_cache.get(code)
if decision is None:
LOG.debug('Deciding if "%s" should be reported', code)
selected = self.is_user_selected(code)
ignored = self.is_user_ignored(code)
LOG.debug('The user configured "%s" to be "%s", "%s"',
code, selected, ignored)
if ((selected is Selected.Explicitly or
selected is Selected.Implicitly) and
ignored is Selected.Implicitly):
decision = Decision.Selected
elif (selected is Selected.Explicitly and
ignored is Ignored.Explicitly):
decision = self._decision_for(code)
elif (selected is Ignored.Implicitly or
ignored is Ignored.Explicitly):
decision = Decision.Ignored # pylint: disable=R0204
self._decision_cache[code] = decision
LOG.debug('"%s" will be "%s"', code, decision)
return decision
def is_inline_ignored(self, error):
# type: (Error) -> bool
"""Determine if an comment has been added to ignore this line."""
physical_line = error.physical_line
# TODO(sigmavirus24): Determine how to handle stdin with linecache
if self.options.disable_noqa:
return False
if physical_line is None:
physical_line = linecache.getline(error.filename,
error.line_number)
noqa_match = self.NOQA_INLINE_REGEXP.search(physical_line)
if noqa_match is None:
LOG.debug('%r is not inline ignored', error)
return False
codes_str = noqa_match.groupdict()['codes']
if codes_str is None:
LOG.debug('%r is ignored by a blanket ``# noqa``', error)
return True
codes = set(utils.parse_comma_separated_list(codes_str))
if error.code in codes or error.code.startswith(tuple(codes)):
LOG.debug('%r is ignored specifically inline with ``# noqa: %s``',
error, codes_str)
return True
LOG.debug('%r is not ignored inline with ``# noqa: %s``',
error, codes_str)
return False
def is_in_diff(self, error):
# type: (Error) -> bool
"""Determine if an error is included in a diff's line ranges.
This function relies on the parsed data added via
:meth:`~StyleGuide.add_diff_ranges`. If that has not been called and
we are not evaluating files in a diff, then this will always return
True. If there are diff ranges, then this will return True if the
line number in the error falls inside one of the ranges for the file
(and assuming the file is part of the diff data). If there are diff
ranges, this will return False if the file is not part of the diff
data or the line number of the error is not in any of the ranges of
the diff.
:returns:
True if there is no diff or if the error is in the diff's line
number ranges. False if the error's line number falls outside
the diff's line number ranges.
:rtype:
bool
"""
if not self._parsed_diff:
return True
# NOTE(sigmavirus24): The parsed diff will be a defaultdict with
# a set as the default value (if we have received it from
# flake8.utils.parse_unified_diff). In that case ranges below
# could be an empty set (which is False-y) or if someone else
# is using this API, it could be None. If we could guarantee one
# or the other, we would check for it more explicitly.
line_numbers = self._parsed_diff.get(error.filename)
if not line_numbers:
return False
return error.line_number in line_numbers
def handle_error(self, code, filename, line_number, column_number, text,
physical_line=None):
# type: (str, str, int, int, str) -> int
"""Handle an error reported by a check.
:param str code:
The error code found, e.g., E123.
:param str filename:
The file in which the error was found.
:param int line_number:
The line number (where counting starts at 1) at which the error
occurs.
:param int column_number:
The column number (where counting starts at 1) at which the error
occurs.
:param str text:
The text of the error message.
:param str physical_line:
The actual physical line causing the error.
:returns:
1 if the error was reported. 0 if it was ignored. This is to allow
for counting of the number of errors found that were not ignored.
:rtype:
int
"""
error = Error(code, filename, line_number, column_number, text,
physical_line)
if error.filename is None or error.filename == '-':
error = error._replace(filename=self.options.stdin_display_name)
error_is_selected = (self.should_report_error(error.code) is
Decision.Selected)
is_not_inline_ignored = self.is_inline_ignored(error) is False
is_included_in_diff = self.is_in_diff(error)
if (error_is_selected and is_not_inline_ignored and
is_included_in_diff):
self.formatter.handle(error)
self.listener.notify(error.code, error)
return 1
return 0
def add_diff_ranges(self, diffinfo):
"""Update the StyleGuide to filter out information not in the diff.
This provides information to the StyleGuide so that only the errors
in the line number ranges are reported.
:param dict diffinfo:
Dictionary mapping filenames to sets of line number ranges.
"""
self._parsed_diff = diffinfo

279
src/flake8/utils.py Normal file
View file

@ -0,0 +1,279 @@
"""Utility methods for flake8."""
import collections
import fnmatch as _fnmatch
import inspect
import io
import os
import re
import sys
DIFF_HUNK_REGEXP = re.compile(r'^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$')
def parse_comma_separated_list(value):
# type: (Union[Sequence[str], str]) -> List[str]
"""Parse a comma-separated list.
:param value:
String or list of strings to be parsed and normalized.
:returns:
List of values with whitespace stripped.
:rtype:
list
"""
if not value:
return []
if not isinstance(value, (list, tuple)):
value = value.split(',')
return [item.strip() for item in value]
def normalize_paths(paths, parent=os.curdir):
# type: (Union[Sequence[str], str], str) -> List[str]
"""Parse a comma-separated list of paths.
:returns:
The normalized paths.
:rtype:
[str]
"""
return [normalize_path(p, parent)
for p in parse_comma_separated_list(paths)]
def normalize_path(path, parent=os.curdir):
# type: (str, str) -> str
"""Normalize a single-path.
:returns:
The normalized path.
:rtype:
str
"""
# NOTE(sigmavirus24): Using os.path.sep allows for Windows paths to
# be specified and work appropriately.
separator = os.path.sep
if separator in path:
path = os.path.abspath(os.path.join(parent, path))
return path.rstrip(separator)
def stdin_get_value():
# type: () -> str
"""Get and cache it so plugins can use it."""
cached_value = getattr(stdin_get_value, 'cached_stdin', None)
if cached_value is None:
stdin_value = sys.stdin.read()
if sys.version_info < (3, 0):
cached_type = io.BytesIO
else:
cached_type = io.StringIO
stdin_get_value.cached_stdin = cached_type(stdin_value)
cached_value = stdin_get_value.cached_stdin
return cached_value.getvalue()
def parse_unified_diff(diff=None):
# type: (str) -> List[str]
"""Parse the unified diff passed on stdin.
:returns:
dictionary mapping file names to sets of line numbers
:rtype:
dict
"""
# Allow us to not have to patch out stdin_get_value
if diff is None:
diff = stdin_get_value()
number_of_rows = None
current_path = None
parsed_paths = collections.defaultdict(set)
for line in diff.splitlines():
if number_of_rows:
# NOTE(sigmavirus24): Below we use a slice because stdin may be
# bytes instead of text on Python 3.
if line[:1] != '-':
number_of_rows -= 1
# We're in the part of the diff that has lines starting with +, -,
# and ' ' to show context and the changes made. We skip these
# because the information we care about is the filename and the
# range within it.
# When number_of_rows reaches 0, we will once again start
# searching for filenames and ranges.
continue
# NOTE(sigmavirus24): Diffs that we support look roughly like:
# diff a/file.py b/file.py
# ...
# --- a/file.py
# +++ b/file.py
# Below we're looking for that last line. Every diff tool that
# gives us this output may have additional information after
# ``b/file.py`` which it will separate with a \t, e.g.,
# +++ b/file.py\t100644
# Which is an example that has the new file permissions/mode.
# In this case we only care about the file name.
if line[:3] == '+++':
current_path = line[4:].split('\t', 1)[0]
# NOTE(sigmavirus24): This check is for diff output from git.
if current_path[:2] == 'b/':
current_path = current_path[2:]
# We don't need to do anything else. We have set up our local
# ``current_path`` variable. We can skip the rest of this loop.
# The next line we will see will give us the hung information
# which is in the next section of logic.
continue
hunk_match = DIFF_HUNK_REGEXP.match(line)
# NOTE(sigmavirus24): pep8/pycodestyle check for:
# line[:3] == '@@ '
# But the DIFF_HUNK_REGEXP enforces that the line start with that
# So we can more simply check for a match instead of slicing and
# comparing.
if hunk_match:
(row, number_of_rows) = [
1 if not group else int(group)
for group in hunk_match.groups()
]
parsed_paths[current_path].update(
range(row, row + number_of_rows)
)
# We have now parsed our diff into a dictionary that looks like:
# {'file.py': set(range(10, 16), range(18, 20)), ...}
return parsed_paths
def is_windows():
# type: () -> bool
"""Determine if we're running on Windows.
:returns:
True if running on Windows, otherwise False
:rtype:
bool
"""
return os.name == 'nt'
def can_run_multiprocessing_on_windows():
# type: () -> bool
"""Determine if we can use multiprocessing on Windows.
:returns:
True if the version of Python is modern enough, otherwise False
:rtype:
bool
"""
is_new_enough_python27 = sys.version_info >= (2, 7, 11)
is_new_enough_python3 = sys.version_info > (3, 2)
return is_new_enough_python27 or is_new_enough_python3
def is_using_stdin(paths):
# type: (List[str]) -> bool
"""Determine if we're going to read from stdin.
:param list paths:
The paths that we're going to check.
:returns:
True if stdin (-) is in the path, otherwise False
:rtype:
bool
"""
return '-' in paths
def _default_predicate(*args):
return False
def filenames_from(arg, predicate=None):
# type: (str, callable) -> Generator
"""Generate filenames from an argument.
:param str arg:
Parameter from the command-line.
:param callable predicate:
Predicate to use to filter out filenames. If the predicate
returns ``True`` we will exclude the filename, otherwise we
will yield it. By default, we include every filename
generated.
:returns:
Generator of paths
"""
if predicate is None:
predicate = _default_predicate
if os.path.isdir(arg):
for root, sub_directories, files in os.walk(arg):
for filename in files:
joined = os.path.join(root, filename)
if predicate(joined):
continue
yield joined
# NOTE(sigmavirus24): os.walk() will skip a directory if you
# remove it from the list of sub-directories.
for directory in sub_directories:
if predicate(directory):
sub_directories.remove(directory)
else:
yield arg
def fnmatch(filename, patterns, default=True):
# type: (str, List[str], bool) -> bool
"""Wrap :func:`fnmatch.fnmatch` to add some functionality.
:param str filename:
Name of the file we're trying to match.
:param list patterns:
Patterns we're using to try to match the filename.
:param bool default:
The default value if patterns is empty
:returns:
True if a pattern matches the filename, False if it doesn't.
``default`` if patterns is empty.
"""
if not patterns:
return default
return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns)
def parameters_for(plugin):
# type: (flake8.plugins.manager.Plugin) -> List[str]
"""Return the parameters for the plugin.
This will inspect the plugin and return either the function parameters
if the plugin is a function or the parameters for ``__init__`` after
``self`` if the plugin is a class.
:param plugin:
The internal plugin object.
:type plugin:
flake8.plugins.manager.Plugin
:returns:
Parameters to the plugin.
:rtype:
list(str)
"""
func = plugin.plugin
is_class = not inspect.isfunction(func)
if is_class: # The plugin is a class
func = plugin.plugin.__init__
if sys.version_info < (3, 3):
parameters = inspect.getargspec(func)[0]
else:
parameters = [
parameter.name
for parameter in inspect.signature(func).parameters.values()
if parameter.kind == parameter.POSITIONAL_OR_KEYWORD
]
if is_class:
parameters.remove('self')
return parameters