Move flake8 2 out of the way

This commit is contained in:
Ian Cordasco 2016-03-15 15:58:24 -05:00
parent 62a7cca512
commit 784a70dd0e
22 changed files with 0 additions and 0 deletions

1
old/flake8/__init__.py Normal file
View file

@ -0,0 +1 @@
__version__ = '2.5.4'

4
old/flake8/__main__.py Normal file
View file

@ -0,0 +1,4 @@
from flake8.main import main
# python -m flake8 (with Python >= 2.7)
main()

120
old/flake8/_pyflakes.py Normal file
View file

@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
try:
# The 'demandimport' breaks pyflakes and flake8._pyflakes
from mercurial import demandimport
except ImportError:
pass
else:
demandimport.disable()
import os
import pep8
import pyflakes
import pyflakes.checker
def patch_pyflakes():
"""Add error codes to Pyflakes messages."""
codes = dict([line.split()[::-1] for line in (
'F401 UnusedImport',
'F402 ImportShadowedByLoopVar',
'F403 ImportStarUsed',
'F404 LateFutureImport',
'F810 Redefined', # XXX Obsolete?
'F811 RedefinedWhileUnused',
'F812 RedefinedInListComp',
'F821 UndefinedName',
'F822 UndefinedExport',
'F823 UndefinedLocal',
'F831 DuplicateArgument',
'F841 UnusedVariable',
)])
for name, obj in vars(pyflakes.messages).items():
if name[0].isupper() and obj.message:
obj.flake8_msg = '%s %s' % (codes.get(name, 'F999'), obj.message)
patch_pyflakes()
class FlakesChecker(pyflakes.checker.Checker):
"""Subclass the Pyflakes checker to conform with the flake8 API."""
name = 'pyflakes'
version = pyflakes.__version__
def __init__(self, tree, filename):
filename = pep8.normalize_paths(filename)[0]
withDoctest = self.withDoctest
included_by = [include for include in self.include_in_doctest
if include != '' and filename.startswith(include)]
if included_by:
withDoctest = True
for exclude in self.exclude_from_doctest:
if exclude != '' and filename.startswith(exclude):
withDoctest = False
overlaped_by = [include for include in included_by
if include.startswith(exclude)]
if overlaped_by:
withDoctest = True
super(FlakesChecker, self).__init__(tree, filename,
withDoctest=withDoctest)
@classmethod
def add_options(cls, parser):
parser.add_option('--builtins',
help="define more built-ins, comma separated")
parser.add_option('--doctests', default=False, action='store_true',
help="check syntax of the doctests")
parser.add_option('--include-in-doctest', default='',
dest='include_in_doctest',
help='Run doctests only on these files',
type='string')
parser.add_option('--exclude-from-doctest', default='',
dest='exclude_from_doctest',
help='Skip these files when running doctests',
type='string')
parser.config_options.extend(['builtins', 'doctests',
'include-in-doctest',
'exclude-from-doctest'])
@classmethod
def parse_options(cls, options):
if options.builtins:
cls.builtIns = cls.builtIns.union(options.builtins.split(','))
cls.withDoctest = options.doctests
included_files = []
for included_file in options.include_in_doctest.split(','):
if included_file == '':
continue
if not included_file.startswith((os.sep, './', '~/')):
included_files.append('./' + included_file)
else:
included_files.append(included_file)
cls.include_in_doctest = pep8.normalize_paths(','.join(included_files))
excluded_files = []
for excluded_file in options.exclude_from_doctest.split(','):
if excluded_file == '':
continue
if not excluded_file.startswith((os.sep, './', '~/')):
excluded_files.append('./' + excluded_file)
else:
excluded_files.append(excluded_file)
cls.exclude_from_doctest = pep8.normalize_paths(
','.join(excluded_files))
inc_exc = set(cls.include_in_doctest).intersection(
set(cls.exclude_from_doctest))
if inc_exc:
raise ValueError('"%s" was specified in both the '
'include-in-doctest and exclude-from-doctest '
'options. You are not allowed to specify it in '
'both for doctesting.' % inc_exc)
def run(self):
for m in self.messages:
col = getattr(m, 'col', 0)
yield m.lineno, col, (m.flake8_msg % m.message_args), m.__class__

27
old/flake8/callbacks.py Normal file
View file

@ -0,0 +1,27 @@
import atexit
import sys
def install_vcs_hook(option, option_str, value, parser):
# For now, there's no way to affect a change in how pep8 processes
# options. If no args are provided and there's no config file present,
# it will error out because no input was provided. To get around this,
# when we're using --install-hook, we'll say that there were arguments so
# we can actually attempt to install the hook.
# See: https://gitlab.com/pycqa/flake8/issues/2 and
# https://github.com/jcrocholl/pep8/blob/4c5bf00cb613be617c7f48d3b2b82a1c7b895ac1/pep8.py#L1912
# for more context.
parser.values.install_hook = True
parser.rargs.append('.')
def restore_stdout(old_stdout):
sys.stdout.close()
sys.stdout = old_stdout
def redirect_stdout(option, option_str, value, parser):
fd = open(value, 'w')
old_stdout, sys.stdout = sys.stdout, fd
atexit.register(restore_stdout, old_stdout)

12
old/flake8/compat.py Normal file
View file

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
"""Compatibility shims for Flake8."""
import os.path
import sys
def relpath(path, start='.'):
"""Wallpaper over the differences between 2.6 and newer versions."""
if sys.version_info < (2, 7) and path.startswith(start):
return path[len(start):]
else:
return os.path.relpath(path, start=start)

316
old/flake8/engine.py Normal file
View file

@ -0,0 +1,316 @@
# -*- coding: utf-8 -*-
import errno
import io
import platform
import re
import sys
import warnings
import pep8
from flake8 import __version__
from flake8 import callbacks
from flake8.reporter import (multiprocessing, BaseQReport, FileQReport,
QueueReport)
from flake8 import util
_flake8_noqa = re.compile(r'\s*# flake8[:=]\s*noqa', re.I).search
EXTRA_EXCLUDE = ['.tox', '.eggs', '*.egg']
pep8.PROJECT_CONFIG += ('.flake8',)
def _load_entry_point(entry_point, verify_requirements):
"""Based on the version of setuptools load an entry-point correctly.
setuptools 11.3 deprecated `require=False` in the call to EntryPoint.load.
To load entry points correctly after that without requiring all
dependencies be present, the proper way is to call EntryPoint.resolve.
This function will provide backwards compatibility for older versions of
setuptools while also ensuring we do the right thing for the future.
"""
if hasattr(entry_point, 'resolve') and hasattr(entry_point, 'require'):
if verify_requirements:
entry_point.require()
plugin = entry_point.resolve()
else:
plugin = entry_point.load(require=verify_requirements)
return plugin
def _register_extensions():
"""Register all the extensions."""
extensions = util.OrderedSet()
extensions.add(('pep8', pep8.__version__))
parser_hooks = []
options_hooks = []
ignored_hooks = []
try:
from pkg_resources import iter_entry_points
except ImportError:
pass
else:
for entry in iter_entry_points('flake8.extension'):
# Do not verify that the requirements versions are valid
checker = _load_entry_point(entry, verify_requirements=False)
pep8.register_check(checker, codes=[entry.name])
extensions.add((checker.name, checker.version))
if hasattr(checker, 'add_options'):
parser_hooks.append(checker.add_options)
if hasattr(checker, 'parse_options'):
options_hooks.append(checker.parse_options)
if getattr(checker, 'off_by_default', False) is True:
ignored_hooks.append(entry.name)
return extensions, parser_hooks, options_hooks, ignored_hooks
def get_parser():
"""This returns an instance of optparse.OptionParser with all the
extensions registered and options set. This wraps ``pep8.get_parser``.
"""
(extensions, parser_hooks, options_hooks, ignored) = _register_extensions()
details = ', '.join('%s: %s' % ext for ext in extensions)
python_version = get_python_version()
parser = pep8.get_parser('flake8', '%s (%s) %s' % (
__version__, details, python_version
))
for opt in ('--repeat', '--testsuite', '--doctest'):
try:
parser.remove_option(opt)
except ValueError:
pass
if multiprocessing:
parser.config_options.append('jobs')
parser.add_option('-j', '--jobs', type='string', default='auto',
help="number of jobs to run simultaneously, "
"or 'auto'. This is ignored on Windows.")
parser.add_option('--exit-zero', action='store_true',
help="exit with code 0 even if there are errors")
for parser_hook in parser_hooks:
parser_hook(parser)
# See comment above regarding why this has to be a callback.
parser.add_option('--install-hook', default=False, dest='install_hook',
help='Install the appropriate hook for this '
'repository.', action='callback',
callback=callbacks.install_vcs_hook)
parser.add_option('--output-file', default=None,
help='Redirect report to a file.',
type='string', nargs=1, action='callback',
callback=callbacks.redirect_stdout)
parser.add_option('--enable-extensions', default='',
dest='enable_extensions',
help='Enable plugins and extensions that are disabled '
'by default',
type='string')
parser.config_options.extend(['output-file', 'enable-extensions'])
parser.ignored_extensions = ignored
return parser, options_hooks
class NoQAStyleGuide(pep8.StyleGuide):
def input_file(self, filename, lines=None, expected=None, line_offset=0):
"""Run all checks on a Python source file."""
if self.options.verbose:
print('checking %s' % filename)
fchecker = self.checker_class(
filename, lines=lines, options=self.options)
# Any "flake8: noqa" comments to ignore the entire file?
if any(_flake8_noqa(line) for line in fchecker.lines):
return 0
return fchecker.check_all(expected=expected, line_offset=line_offset)
class StyleGuide(object):
"""A wrapper StyleGuide object for Flake8 usage.
This allows for OSErrors to be caught in the styleguide and special logic
to be used to handle those errors.
"""
# Reasoning for error numbers is in-line below
serial_retry_errors = set([
# ENOSPC: Added by sigmavirus24
# > On some operating systems (OSX), multiprocessing may cause an
# > ENOSPC error while trying to trying to create a Semaphore.
# > In those cases, we should replace the customized Queue Report
# > class with pep8's StandardReport class to ensure users don't run
# > into this problem.
# > (See also: https://gitlab.com/pycqa/flake8/issues/74)
errno.ENOSPC,
# NOTE(sigmavirus24): When adding to this list, include the reasoning
# on the lines before the error code and always append your error
# code. Further, please always add a trailing `,` to reduce the visual
# noise in diffs.
])
def __init__(self, **kwargs):
# This allows us to inject a mocked StyleGuide in the tests.
self._styleguide = kwargs.pop('styleguide', NoQAStyleGuide(**kwargs))
@property
def options(self):
return self._styleguide.options
@property
def paths(self):
return self._styleguide.paths
def _retry_serial(self, func, *args, **kwargs):
"""This will retry the passed function in serial if necessary.
In the event that we encounter an OSError with an errno in
:attr:`serial_retry_errors`, this function will retry this function
using pep8's default Report class which operates in serial.
"""
try:
return func(*args, **kwargs)
except OSError as oserr:
if oserr.errno in self.serial_retry_errors:
self.init_report(pep8.StandardReport)
else:
raise
return func(*args, **kwargs)
def check_files(self, paths=None):
return self._retry_serial(self._styleguide.check_files, paths=paths)
def excluded(self, filename, parent=None):
return self._styleguide.excluded(filename, parent=parent)
def init_report(self, reporter=None):
return self._styleguide.init_report(reporter)
def input_file(self, filename, lines=None, expected=None, line_offset=0):
return self._retry_serial(
self._styleguide.input_file,
filename=filename,
lines=lines,
expected=expected,
line_offset=line_offset,
)
def _parse_multi_options(options, split_token=','):
r"""Split and strip and discard empties.
Turns the following:
A,
B,
into ["A", "B"].
Credit: Kristian Glass as contributed to pep8
"""
if options:
return [o.strip() for o in options.split(split_token) if o.strip()]
else:
return options
def _disable_extensions(parser, options):
ignored_extensions = set(getattr(parser, 'ignored_extensions', []))
enabled = set(_parse_multi_options(options.enable_extensions))
# Remove any of the selected extensions from the extensions ignored by
# default.
ignored_extensions -= enabled
# Whatever is left afterwards should be unioned with options.ignore and
# options.ignore should be updated with that.
options.ignore = tuple(ignored_extensions.union(options.ignore))
def get_style_guide(**kwargs):
"""Parse the options and configure the checker. This returns a sub-class
of ``pep8.StyleGuide``."""
kwargs['parser'], options_hooks = get_parser()
styleguide = StyleGuide(**kwargs)
options = styleguide.options
_disable_extensions(kwargs['parser'], options)
if options.exclude and not isinstance(options.exclude, list):
options.exclude = pep8.normalize_paths(options.exclude)
elif not options.exclude:
options.exclude = []
# Add patterns in EXTRA_EXCLUDE to the list of excluded patterns
options.exclude.extend(pep8.normalize_paths(EXTRA_EXCLUDE))
for options_hook in options_hooks:
options_hook(options)
if util.warn_when_using_jobs(options):
if not multiprocessing:
warnings.warn("The multiprocessing module is not available. "
"Ignoring --jobs arguments.")
if util.is_windows():
warnings.warn("The --jobs option is not available on Windows. "
"Ignoring --jobs arguments.")
if util.is_using_stdin(styleguide.paths):
warnings.warn("The --jobs option is not compatible with supplying "
"input using - . Ignoring --jobs arguments.")
if options.diff:
warnings.warn("The --diff option was specified with --jobs but "
"they are not compatible. Ignoring --jobs arguments."
)
if options.diff:
options.jobs = None
force_disable_jobs = util.force_disable_jobs(styleguide)
if multiprocessing and options.jobs and not force_disable_jobs:
if options.jobs.isdigit():
n_jobs = int(options.jobs)
else:
try:
n_jobs = multiprocessing.cpu_count()
except NotImplementedError:
n_jobs = 1
if n_jobs > 1:
options.jobs = n_jobs
reporter = QueueReport
if options.quiet:
reporter = BaseQReport
if options.quiet == 1:
reporter = FileQReport
report = styleguide.init_report(reporter)
report.input_file = styleguide.input_file
styleguide.runner = report.task_queue.put
return styleguide
def get_python_version():
# The implementation isn't all that important.
try:
impl = platform.python_implementation() + " "
except AttributeError: # Python 2.5
impl = ''
return '%s%s on %s' % (impl, platform.python_version(), platform.system())
def make_stdin_get_value(original):
def stdin_get_value():
if not hasattr(stdin_get_value, 'cached_stdin'):
value = original()
if sys.version_info < (3, 0):
stdin = io.BytesIO(value)
else:
stdin = io.StringIO(value)
stdin_get_value.cached_stdin = stdin
else:
stdin = stdin_get_value.cached_stdin
return stdin.getvalue()
return stdin_get_value
pep8.stdin_get_value = make_stdin_get_value(pep8.stdin_get_value)

297
old/flake8/hooks.py Normal file
View file

@ -0,0 +1,297 @@
# -*- coding: utf-8 -*-
from __future__ import with_statement
import os
import pep8
import sys
import stat
from subprocess import Popen, PIPE
import shutil
import tempfile
try:
from configparser import ConfigParser
except ImportError: # Python 2
from ConfigParser import ConfigParser
from flake8 import compat
from flake8.engine import get_parser, get_style_guide
from flake8.main import DEFAULT_CONFIG
def git_hook(complexity=-1, strict=False, ignore=None, lazy=False):
"""This is the function used by the git hook.
:param int complexity: (optional), any value > 0 enables complexity
checking with mccabe
:param bool strict: (optional), if True, this returns the total number of
errors which will cause the hook to fail
:param str ignore: (optional), a comma-separated list of errors and
warnings to ignore
:param bool lazy: (optional), allows for the instances where you don't add
the files to the index before running a commit, e.g., git commit -a
:returns: total number of errors if strict is True, otherwise 0
"""
gitcmd = "git diff-index --cached --name-only --diff-filter=ACMRTUXB HEAD"
if lazy:
# Catch all files, including those not added to the index
gitcmd = gitcmd.replace('--cached ', '')
if hasattr(ignore, 'split'):
ignore = ignore.split(',')
# Returns the exit code, list of files modified, list of error messages
_, files_modified, _ = run(gitcmd)
# We only want to pass ignore and max_complexity if they differ from the
# defaults so that we don't override a local configuration file
options = {}
if ignore:
options['ignore'] = ignore
if complexity > -1:
options['max_complexity'] = complexity
tmpdir = tempfile.mkdtemp()
flake8_style = get_style_guide(config_file=DEFAULT_CONFIG, paths=['.'],
**options)
filepatterns = flake8_style.options.filename
# Copy staged versions to temporary directory
files_to_check = []
try:
for file_ in files_modified:
# get the staged version of the file
gitcmd_getstaged = "git show :%s" % file_
_, out, _ = run(gitcmd_getstaged, raw_output=True, decode=False)
# write the staged version to temp dir with its full path to
# avoid overwriting files with the same name
dirname, filename = os.path.split(os.path.abspath(file_))
prefix = os.path.commonprefix([dirname, tmpdir])
dirname = compat.relpath(dirname, start=prefix)
dirname = os.path.join(tmpdir, dirname)
if not os.path.isdir(dirname):
os.makedirs(dirname)
# check_files() only does this check if passed a dir; so we do it
if ((pep8.filename_match(file_, filepatterns) and
not flake8_style.excluded(file_))):
filename = os.path.join(dirname, filename)
files_to_check.append(filename)
# write staged version of file to temporary directory
with open(filename, "wb") as fh:
fh.write(out)
# Run the checks
report = flake8_style.check_files(files_to_check)
# remove temporary directory
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
if strict:
return report.total_errors
return 0
def hg_hook(ui, repo, **kwargs):
"""This is the function executed directly by Mercurial as part of the
hook. This is never called directly by the user, so the parameters are
undocumented. If you would like to learn more about them, please feel free
to read the official Mercurial documentation.
"""
complexity = ui.config('flake8', 'complexity', default=-1)
strict = ui.configbool('flake8', 'strict', default=True)
ignore = ui.config('flake8', 'ignore', default=None)
config = ui.config('flake8', 'config', default=DEFAULT_CONFIG)
paths = _get_files(repo, **kwargs)
# We only want to pass ignore and max_complexity if they differ from the
# defaults so that we don't override a local configuration file
options = {}
if ignore:
options['ignore'] = ignore
if complexity > -1:
options['max_complexity'] = complexity
flake8_style = get_style_guide(config_file=config, paths=['.'],
**options)
report = flake8_style.check_files(paths)
if strict:
return report.total_errors
return 0
def run(command, raw_output=False, decode=True):
p = Popen(command.split(), stdout=PIPE, stderr=PIPE)
(stdout, stderr) = p.communicate()
# On python 3, subprocess.Popen returns bytes objects which expect
# endswith to be given a bytes object or a tuple of bytes but not native
# string objects. This is simply less mysterious than using b'.py' in the
# endswith method. That should work but might still fail horribly.
if decode:
if hasattr(stdout, 'decode'):
stdout = stdout.decode('utf-8')
if hasattr(stderr, 'decode'):
stderr = stderr.decode('utf-8')
if not raw_output:
stdout = [line.strip() for line in stdout.splitlines()]
stderr = [line.strip() for line in stderr.splitlines()]
return (p.returncode, stdout, stderr)
def _get_files(repo, **kwargs):
seen = set()
for rev in range(repo[kwargs['node']], len(repo)):
for file_ in repo[rev].files():
file_ = os.path.join(repo.root, file_)
if file_ in seen or not os.path.exists(file_):
continue
seen.add(file_)
if file_.endswith('.py'):
yield file_
def find_vcs():
try:
_, git_dir, _ = run('git rev-parse --git-dir')
except OSError:
pass
else:
if git_dir and os.path.isdir(git_dir[0]):
if not os.path.isdir(os.path.join(git_dir[0], 'hooks')):
os.mkdir(os.path.join(git_dir[0], 'hooks'))
return os.path.join(git_dir[0], 'hooks', 'pre-commit')
try:
_, hg_dir, _ = run('hg root')
except OSError:
pass
else:
if hg_dir and os.path.isdir(hg_dir[0]):
return os.path.join(hg_dir[0], '.hg', 'hgrc')
return ''
def get_git_config(option, opt_type='', convert_type=True):
# type can be --bool, --int or an empty string
_, git_cfg_value, _ = run('git config --get %s %s' % (opt_type, option),
raw_output=True)
git_cfg_value = git_cfg_value.strip()
if not convert_type:
return git_cfg_value
if opt_type == '--bool':
git_cfg_value = git_cfg_value.lower() == 'true'
elif git_cfg_value and opt_type == '--int':
git_cfg_value = int(git_cfg_value)
return git_cfg_value
_params = {
'FLAKE8_COMPLEXITY': '--int',
'FLAKE8_STRICT': '--bool',
'FLAKE8_IGNORE': '',
'FLAKE8_LAZY': '--bool',
}
def get_git_param(option, default=''):
global _params
opt_type = _params[option]
param_value = get_git_config(option.lower().replace('_', '.'),
opt_type=opt_type, convert_type=False)
if param_value == '':
param_value = os.environ.get(option, default)
if opt_type == '--bool' and not isinstance(param_value, bool):
param_value = param_value.lower() == 'true'
elif param_value and opt_type == '--int':
param_value = int(param_value)
return param_value
git_hook_file = """#!/usr/bin/env python
import sys
from flake8.hooks import git_hook, get_git_param
# `get_git_param` will retrieve configuration from your local git config and
# then fall back to using the environment variables that the hook has always
# supported.
# For example, to set the complexity, you'll need to do:
# git config flake8.complexity 10
COMPLEXITY = get_git_param('FLAKE8_COMPLEXITY', 10)
STRICT = get_git_param('FLAKE8_STRICT', False)
IGNORE = get_git_param('FLAKE8_IGNORE', None)
LAZY = get_git_param('FLAKE8_LAZY', False)
if __name__ == '__main__':
sys.exit(git_hook(
complexity=COMPLEXITY,
strict=STRICT,
ignore=IGNORE,
lazy=LAZY,
))
"""
def _install_hg_hook(path):
getenv = os.environ.get
if not os.path.isfile(path):
# Make the file so we can avoid IOError's
open(path, 'w').close()
c = ConfigParser()
c.readfp(open(path, 'r'))
if not c.has_section('hooks'):
c.add_section('hooks')
if not c.has_option('hooks', 'commit'):
c.set('hooks', 'commit', 'python:flake8.hooks.hg_hook')
if not c.has_option('hooks', 'qrefresh'):
c.set('hooks', 'qrefresh', 'python:flake8.hooks.hg_hook')
if not c.has_section('flake8'):
c.add_section('flake8')
if not c.has_option('flake8', 'complexity'):
c.set('flake8', 'complexity', str(getenv('FLAKE8_COMPLEXITY', 10)))
if not c.has_option('flake8', 'strict'):
c.set('flake8', 'strict', getenv('FLAKE8_STRICT', False))
if not c.has_option('flake8', 'ignore'):
c.set('flake8', 'ignore', getenv('FLAKE8_IGNORE', ''))
if not c.has_option('flake8', 'lazy'):
c.set('flake8', 'lazy', getenv('FLAKE8_LAZY', False))
with open(path, 'w') as fd:
c.write(fd)
def install_hook():
vcs = find_vcs()
if not vcs:
p = get_parser()[0]
sys.stderr.write('Error: could not find either a git or mercurial '
'directory. Please re-run this in a proper '
'repository.\n')
p.print_help()
sys.exit(1)
status = 0
if 'git' in vcs:
if os.path.exists(vcs):
sys.exit('Error: hook already exists (%s)' % vcs)
with open(vcs, 'w') as fd:
fd.write(git_hook_file)
# rwxr--r--
os.chmod(vcs, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
elif 'hg' in vcs:
_install_hg_hook(vcs)
else:
status = 1
sys.exit(status)

142
old/flake8/main.py Normal file
View file

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
import os
import re
import sys
import setuptools
from flake8.engine import get_parser, get_style_guide
from flake8.util import option_normalizer
if sys.platform.startswith('win'):
DEFAULT_CONFIG = os.path.expanduser(r'~\.flake8')
else:
DEFAULT_CONFIG = os.path.join(
os.getenv('XDG_CONFIG_HOME') or os.path.expanduser('~/.config'),
'flake8'
)
EXTRA_IGNORE = []
def main():
"""Parse options and run checks on Python source."""
# Prepare
flake8_style = get_style_guide(parse_argv=True, config_file=DEFAULT_CONFIG)
options = flake8_style.options
if options.install_hook:
from flake8.hooks import install_hook
install_hook()
# Run the checkers
report = flake8_style.check_files()
exit_code = print_report(report, flake8_style)
if exit_code > 0:
raise SystemExit(exit_code > 0)
def print_report(report, flake8_style):
# Print the final report
options = flake8_style.options
if options.statistics:
report.print_statistics()
if options.benchmark:
report.print_benchmark()
if report.total_errors:
if options.count:
sys.stderr.write(str(report.total_errors) + '\n')
if not options.exit_zero:
return 1
return 0
def check_file(path, ignore=(), complexity=-1):
"""Checks a file using pep8 and pyflakes by default and mccabe
optionally.
:param str path: path to the file to be checked
:param tuple ignore: (optional), error and warning codes to be ignored
:param int complexity: (optional), enables the mccabe check for values > 0
"""
ignore = set(ignore).union(EXTRA_IGNORE)
flake8_style = get_style_guide(
config_file=DEFAULT_CONFIG, ignore=ignore, max_complexity=complexity)
return flake8_style.input_file(path)
def check_code(code, ignore=(), complexity=-1):
"""Checks code using pep8 and pyflakes by default and mccabe optionally.
:param str code: code to be checked
:param tuple ignore: (optional), error and warning codes to be ignored
:param int complexity: (optional), enables the mccabe check for values > 0
"""
ignore = set(ignore).union(EXTRA_IGNORE)
flake8_style = get_style_guide(
config_file=DEFAULT_CONFIG, ignore=ignore, max_complexity=complexity)
return flake8_style.input_file(None, lines=code.splitlines(True))
class Flake8Command(setuptools.Command):
"""The :class:`Flake8Command` class is used by setuptools to perform
checks on registered modules.
"""
description = "Run flake8 on modules registered in setuptools"
user_options = []
def initialize_options(self):
self.option_to_cmds = {}
parser = get_parser()[0]
for opt in parser.option_list:
cmd_name = opt._long_opts[0][2:]
option_name = cmd_name.replace('-', '_')
self.option_to_cmds[option_name] = opt
setattr(self, option_name, None)
def finalize_options(self):
self.options_dict = {}
for (option_name, opt) in self.option_to_cmds.items():
if option_name in ['help', 'verbose']:
continue
value = getattr(self, option_name)
if value is None:
continue
value = option_normalizer(value, opt, option_name)
# Check if there's any values that need to be fixed.
if option_name == "include" and isinstance(value, str):
value = re.findall('[^,;\s]+', value)
self.options_dict[option_name] = value
def distribution_files(self):
if self.distribution.packages:
package_dirs = self.distribution.package_dir or {}
for package in self.distribution.packages:
pkg_dir = package
if package in package_dirs:
pkg_dir = package_dirs[package]
elif '' in package_dirs:
pkg_dir = package_dirs[''] + os.path.sep + pkg_dir
yield pkg_dir.replace('.', os.path.sep)
if self.distribution.py_modules:
for filename in self.distribution.py_modules:
yield "%s.py" % filename
# Don't miss the setup.py file itself
yield "setup.py"
def run(self):
# Prepare
paths = list(self.distribution_files())
flake8_style = get_style_guide(config_file=DEFAULT_CONFIG,
paths=paths,
**self.options_dict)
# Run the checkers
report = flake8_style.check_files()
exit_code = print_report(report, flake8_style)
if exit_code > 0:
raise SystemExit(exit_code > 0)

152
old/flake8/reporter.py Normal file
View file

@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
# Adapted from a contribution of Johan Dahlin
import collections
import errno
import re
import sys
try:
import multiprocessing
except ImportError: # Python 2.5
multiprocessing = None
import pep8
__all__ = ['multiprocessing', 'BaseQReport', 'QueueReport']
class BaseQReport(pep8.BaseReport):
"""Base Queue Report."""
_loaded = False # Windows support
# Reasoning for ignored error numbers is in-line below
ignored_errors = set([
# EPIPE: Added by sigmavirus24
# > If output during processing is piped to something that may close
# > its own stdin before we've finished printing results, we need to
# > catch a Broken pipe error and continue on.
# > (See also: https://gitlab.com/pycqa/flake8/issues/69)
errno.EPIPE,
# NOTE(sigmavirus24): When adding to this list, include the reasoning
# on the lines before the error code and always append your error
# code. Further, please always add a trailing `,` to reduce the visual
# noise in diffs.
])
def __init__(self, options):
assert options.jobs > 0
super(BaseQReport, self).__init__(options)
self.counters = collections.defaultdict(int)
self.n_jobs = options.jobs
# init queues
self.task_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
if sys.platform == 'win32':
# Work around http://bugs.python.org/issue10845
sys.modules['__main__'].__file__ = __file__
def _cleanup_queue(self, queue):
while not queue.empty():
queue.get_nowait()
def _put_done(self):
# collect queues
for i in range(self.n_jobs):
self.task_queue.put('DONE')
self.update_state(self.result_queue.get())
def _process_main(self):
if not self._loaded:
# Windows needs to parse again the configuration
from flake8.main import get_style_guide, DEFAULT_CONFIG
get_style_guide(parse_argv=True, config_file=DEFAULT_CONFIG)
for filename in iter(self.task_queue.get, 'DONE'):
self.input_file(filename)
def start(self):
super(BaseQReport, self).start()
self.__class__._loaded = True
# spawn processes
for i in range(self.n_jobs):
p = multiprocessing.Process(target=self.process_main)
p.daemon = True
p.start()
def stop(self):
try:
self._put_done()
except KeyboardInterrupt:
pass
finally:
# cleanup queues to unlock threads
self._cleanup_queue(self.result_queue)
self._cleanup_queue(self.task_queue)
super(BaseQReport, self).stop()
def process_main(self):
try:
self._process_main()
except KeyboardInterrupt:
pass
except IOError as ioerr:
# If we happen across an IOError that we aren't certain can/should
# be ignored, we should re-raise the exception.
if ioerr.errno not in self.ignored_errors:
raise
finally:
# ensure all output is flushed before main process continues
sys.stdout.flush()
sys.stderr.flush()
self.result_queue.put(self.get_state())
def get_state(self):
return {'total_errors': self.total_errors,
'counters': self.counters,
'messages': self.messages}
def update_state(self, state):
self.total_errors += state['total_errors']
for key, value in state['counters'].items():
self.counters[key] += value
self.messages.update(state['messages'])
class FileQReport(BaseQReport):
"""File Queue Report."""
print_filename = True
class QueueReport(pep8.StandardReport, BaseQReport):
"""Standard Queue Report."""
def get_file_results(self):
"""Print the result and return the overall count for this file."""
self._deferred_print.sort()
for line_number, offset, code, text, doc in self._deferred_print:
print(self._fmt % {
'path': self.filename,
'row': self.line_offset + line_number, 'col': offset + 1,
'code': code, 'text': text,
})
# stdout is block buffered when not stdout.isatty().
# line can be broken where buffer boundary since other processes
# write to same file.
# flush() after print() to avoid buffer boundary.
# Typical buffer size is 8192. line written safely when
# len(line) < 8192.
sys.stdout.flush()
if self._show_source:
if line_number > len(self.lines):
line = ''
else:
line = self.lines[line_number - 1]
print(line.rstrip())
sys.stdout.flush()
print(re.sub(r'\S', ' ', line[:offset]) + '^')
sys.stdout.flush()
if self._show_pep8 and doc:
print(' ' + doc.strip())
sys.stdout.flush()
return self.file_errors

11
old/flake8/run.py Normal file
View file

@ -0,0 +1,11 @@
"""
Implementation of the command-line I{flake8} tool.
"""
from flake8.hooks import git_hook, hg_hook # noqa
from flake8.main import check_code, check_file, Flake8Command # noqa
from flake8.main import main
if __name__ == '__main__':
main()

View file

@ -0,0 +1 @@
#

View file

@ -0,0 +1,309 @@
"""
_test_warnings.py
Tests for the warnings that are emitted by flake8.
This module is named _test_warnings instead of test_warnings so that a
normal nosetests run does not collect it. The tests in this module pass
when they are run alone, but they fail when they are run along with other
tests (nosetests --with-isolation doesn't help).
In tox.ini, these tests are run separately.
"""
from __future__ import with_statement
import os
import warnings
import unittest
try:
from unittest import mock
except ImportError:
import mock # < PY33
from flake8 import engine
from flake8.util import is_windows
# The Problem
# ------------
#
# Some of the tests in this module pass when this module is run on its own, but
# they fail when this module is run as part of the whole test suite. These are
# the problematic tests:
#
# test_jobs_verbose
# test_stdin_jobs_warning
#
# On some platforms, the warnings.capture_warnings function doesn't work
# properly when run with the other flake8 tests. It drops some warnings, even
# though the warnings filter is set to 'always'. However, when run separately,
# these tests pass.
#
# This problem only occurs on Windows, with Python 3.3 and older. Maybe it's
# related to PEP 446 - Inheritable file descriptors?
#
#
#
#
# Things that didn't work
# ------------
#
# Nose --attr
# I tried using the nosetests --attr feature to run the tests separately. I
# put the following in setup.cfg
#
# [nosetests]
# atttr=!run_alone
#
# Then I added a tox section thst did this
#
# nosetests --attr=run_alone
#
# However, the command line --attr would not override the config file --attr,
# so the special tox section wound up runing all the tests, and failing.
#
#
#
# Nose --with-isolation
# The nosetests --with-isolation flag did not help.
#
#
#
# unittest.skipIf
# I tried decorating the problematic tests with the unittest.skipIf
# decorator.
#
# @unittest.skipIf(is_windows() and sys.version_info < (3, 4),
# "Fails on Windows with Python < 3.4 when run with other"
# " tests.")
#
# The idea is, skip the tests in the main test run, on affected platforms.
# Then, only on those platforms, come back in later and run the tests
# separately.
#
# I added a new stanza to tox.ini, to run the tests separately on the
# affected platforms.
#
# nosetests --no-skip
#
# I ran in to a bug in the nosetests skip plugin. It would report the test as
# having been run, but it would not actually run the test. So, when run with
# --no-skip, the following test would be reported as having run and passed!
#
# @unittest.skip("This passes o_o")
# def test_should_fail(self):
# assert 0
#
# This bug has been reported here:
# "--no-skip broken with Python 2.7"
# https://github.com/nose-devs/nose/issues/512
#
#
#
# py.test
#
# I tried using py.test, and its @pytest.mark.xfail decorator. I added some
# separate stanzas in tox, and useing the pytest --runxfail option to run the
# tests separately. This allows us to run all the tests together, on
# platforms that allow it. On platforms that don't allow us to run the tests
# all together, this still runs all the tests, but in two separate steps.
#
# This is the same solution as the nosetests --no-skip solution I described
# above, but --runxfail does not have the same bug as --no-skip.
#
# This has the advantage that all tests are discoverable by default, outside
# of tox. However, nose does not recognize the pytest.mark.xfail decorator.
# So, if a user runs nosetests, it still tries to run the problematic tests
# together with the rest of the test suite, causing them to fail.
#
#
#
#
#
#
# Solution
# ------------
# Move the problematic tests to _test_warnings.py, so nose.collector will not
# find them. Set up a separate section in tox.ini that runs this:
#
# nosetests flake8.tests._test_warnings
#
# This allows all tests to pass on all platforms, when run through tox.
# However, it means that, even on unaffected platforms, the problematic tests
# are not discovered and run outside of tox (if the user just runs nosetests
# manually, for example).
class IntegrationTestCaseWarnings(unittest.TestCase):
"""Integration style tests to check that warnings are issued properly for
different command line options."""
windows_warning_text = ("The --jobs option is not available on Windows."
" Ignoring --jobs arguments.")
stdin_warning_text = ("The --jobs option is not compatible with"
" supplying input using - . Ignoring --jobs"
" arguments.")
def this_file(self):
"""Return the real path of this file."""
this_file = os.path.realpath(__file__)
if this_file.endswith("pyc"):
this_file = this_file[:-1]
return this_file
@staticmethod
def get_style_guide_with_warnings(engine, *args, **kwargs):
"""
Return a style guide object (obtained by calling
engine.get_style_guide) and a list of the warnings that were raised in
the process.
Note: not threadsafe
"""
# Note
# https://docs.python.org/2/library/warnings.html
#
# The catch_warnings manager works by replacing and then later
# restoring the module's showwarning() function and internal list of
# filter specifications. This means the context manager is modifying
# global state and therefore is not thread-safe
with warnings.catch_warnings(record=True) as collected_warnings:
# Cause all warnings to always be triggered.
warnings.simplefilter("always")
# Get the style guide
style_guide = engine.get_style_guide(*args, **kwargs)
# Now that the warnings have been collected, return the style guide and
# the warnings.
return (style_guide, collected_warnings)
def verify_warnings(self, collected_warnings, expected_warnings):
"""
Verifies that collected_warnings is a sequence that contains user
warnings that match the sequence of string values passed in as
expected_warnings.
"""
if expected_warnings is None:
expected_warnings = []
collected_user_warnings = [w for w in collected_warnings
if issubclass(w.category, UserWarning)]
self.assertEqual(len(collected_user_warnings),
len(expected_warnings))
collected_warnings_set = set(str(warning.message)
for warning
in collected_user_warnings)
expected_warnings_set = set(expected_warnings)
self.assertEqual(collected_warnings_set, expected_warnings_set)
def check_files_collect_warnings(self,
arglist=[],
explicit_stdin=False,
count=0,
verbose=False):
"""Call check_files and collect any warnings that are issued."""
if verbose:
arglist.append('--verbose')
if explicit_stdin:
target_file = "-"
else:
target_file = self.this_file()
argv = ['flake8'] + arglist + [target_file]
with mock.patch("sys.argv", argv):
(style_guide,
collected_warnings,
) = self.get_style_guide_with_warnings(engine,
parse_argv=True)
report = style_guide.check_files()
self.assertEqual(report.total_errors, count)
return style_guide, report, collected_warnings
def check_files_no_warnings_allowed(self,
arglist=[],
explicit_stdin=False,
count=0,
verbose=False):
"""Call check_files, and assert that there were no warnings issued."""
(style_guide,
report,
collected_warnings,
) = self.check_files_collect_warnings(arglist=arglist,
explicit_stdin=explicit_stdin,
count=count,
verbose=verbose)
self.verify_warnings(collected_warnings, expected_warnings=None)
return style_guide, report
def _job_tester(self, jobs, verbose=False):
# mock stdout.flush so we can count the number of jobs created
with mock.patch('sys.stdout.flush') as mocked:
(guide,
report,
collected_warnings,
) = self.check_files_collect_warnings(
arglist=['--jobs=%s' % jobs],
verbose=verbose)
if is_windows():
# The code path where guide.options.jobs gets converted to an
# int is not run on windows. So, do the int conversion here.
self.assertEqual(int(guide.options.jobs), jobs)
# On windows, call count is always zero.
self.assertEqual(mocked.call_count, 0)
else:
self.assertEqual(guide.options.jobs, jobs)
self.assertEqual(mocked.call_count, jobs)
expected_warings = []
if verbose and is_windows():
expected_warings.append(self.windows_warning_text)
self.verify_warnings(collected_warnings, expected_warings)
def test_jobs(self, verbose=False):
self._job_tester(2, verbose=verbose)
self._job_tester(10, verbose=verbose)
def test_no_args_no_warnings(self, verbose=False):
self.check_files_no_warnings_allowed(verbose=verbose)
def test_stdin_jobs_warning(self, verbose=False):
self.count = 0
def fake_stdin():
self.count += 1
with open(self.this_file(), "r") as f:
return f.read()
with mock.patch("pep8.stdin_get_value", fake_stdin):
(style_guide,
report,
collected_warnings,
) = self.check_files_collect_warnings(arglist=['--jobs=4'],
explicit_stdin=True,
verbose=verbose)
expected_warings = []
if verbose:
expected_warings.append(self.stdin_warning_text)
if is_windows():
expected_warings.append(self.windows_warning_text)
self.verify_warnings(collected_warnings, expected_warings)
self.assertEqual(self.count, 1)
def test_jobs_verbose(self):
self.test_jobs(verbose=True)
def test_no_args_no_warnings_verbose(self):
self.test_no_args_no_warnings(verbose=True)
def test_stdin_jobs_warning_verbose(self):
self.test_stdin_jobs_warning(verbose=True)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,236 @@
from __future__ import with_statement
import errno
import unittest
try:
from unittest import mock
except ImportError:
import mock # < PY33
from flake8 import engine, util, __version__, reporter
import pep8
class TestEngine(unittest.TestCase):
def setUp(self):
self.patches = {}
def tearDown(self):
assert len(self.patches.items()) == 0
def start_patch(self, patch):
self.patches[patch] = mock.patch(patch)
return self.patches[patch].start()
def stop_patches(self):
patches = self.patches.copy()
for k, v in patches.items():
v.stop()
del(self.patches[k])
def test_get_style_guide(self):
with mock.patch('flake8.engine._register_extensions') as reg_ext:
reg_ext.return_value = ([], [], [], [])
g = engine.get_style_guide()
self.assertTrue(isinstance(g, engine.StyleGuide))
reg_ext.assert_called_once_with()
def test_get_style_guide_kwargs(self):
m = mock.Mock()
with mock.patch('flake8.engine.StyleGuide') as StyleGuide:
with mock.patch('flake8.engine.get_parser') as get_parser:
m.ignored_extensions = []
StyleGuide.return_value.options.jobs = '42'
StyleGuide.return_value.options.diff = False
get_parser.return_value = (m, [])
engine.get_style_guide(foo='bar')
get_parser.assert_called_once_with()
StyleGuide.assert_called_once_with(**{'parser': m, 'foo': 'bar'})
def test_register_extensions(self):
with mock.patch('pep8.register_check') as register_check:
registered_exts = engine._register_extensions()
self.assertTrue(isinstance(registered_exts[0], util.OrderedSet))
self.assertTrue(len(registered_exts[0]) > 0)
for i in registered_exts[1:]:
self.assertTrue(isinstance(i, list))
self.assertTrue(register_check.called)
def test_disable_extensions(self):
parser = mock.MagicMock()
options = mock.MagicMock()
parser.ignored_extensions = ['I123', 'I345', 'I678', 'I910']
options.enable_extensions = 'I345,\nI678,I910'
options.ignore = ('E121', 'E123')
engine._disable_extensions(parser, options)
self.assertEqual(set(options.ignore), set(['E121', 'E123', 'I123']))
def test_get_parser(self):
# setup
re = self.start_patch('flake8.engine._register_extensions')
gpv = self.start_patch('flake8.engine.get_python_version')
pgp = self.start_patch('pep8.get_parser')
m = mock.Mock()
re.return_value = ([('pyflakes', '0.7'), ('mccabe', '0.2')], [], [],
[])
gpv.return_value = 'Python Version'
pgp.return_value = m
# actual call we're testing
parser, hooks = engine.get_parser()
# assertions
self.assertTrue(re.called)
self.assertTrue(gpv.called)
pgp.assert_called_once_with(
'flake8',
'%s (pyflakes: 0.7, mccabe: 0.2) Python Version' % __version__)
self.assertTrue(m.remove_option.called)
self.assertTrue(m.add_option.called)
self.assertEqual(parser, m)
self.assertEqual(hooks, [])
# clean-up
self.stop_patches()
def test_get_python_version(self):
self.assertTrue('on' in engine.get_python_version())
# Silly test but it will provide 100% test coverage
# Also we can never be sure (without reconstructing the string
# ourselves) what system we may be testing on.
def test_windows_disables_jobs(self):
with mock.patch('flake8.util.is_windows') as is_windows:
is_windows.return_value = True
guide = engine.get_style_guide()
assert isinstance(guide, reporter.BaseQReport) is False
def test_stdin_disables_jobs(self):
with mock.patch('flake8.util.is_using_stdin') as is_using_stdin:
is_using_stdin.return_value = True
guide = engine.get_style_guide()
assert isinstance(guide, reporter.BaseQReport) is False
def test_disables_extensions_that_are_not_selected(self):
with mock.patch('flake8.engine._register_extensions') as re:
re.return_value = ([('fake_ext', '0.1a1')], [], [], ['X'])
sg = engine.get_style_guide()
assert 'X' in sg.options.ignore
def test_enables_off_by_default_extensions(self):
with mock.patch('flake8.engine._register_extensions') as re:
re.return_value = ([('fake_ext', '0.1a1')], [], [], ['X'])
parser, options = engine.get_parser()
parser.parse_args(['--select=X'])
sg = engine.StyleGuide(parser=parser)
assert 'X' not in sg.options.ignore
def test_load_entry_point_verifies_requirements(self):
entry_point = mock.Mock(spec=['require', 'resolve', 'load'])
engine._load_entry_point(entry_point, verify_requirements=True)
entry_point.require.assert_called_once_with()
entry_point.resolve.assert_called_once_with()
def test_load_entry_point_does_not_verify_requirements(self):
entry_point = mock.Mock(spec=['require', 'resolve', 'load'])
engine._load_entry_point(entry_point, verify_requirements=False)
self.assertFalse(entry_point.require.called)
entry_point.resolve.assert_called_once_with()
def test_load_entry_point_passes_require_argument_to_load(self):
entry_point = mock.Mock(spec=['load'])
engine._load_entry_point(entry_point, verify_requirements=True)
entry_point.load.assert_called_once_with(require=True)
entry_point.reset_mock()
engine._load_entry_point(entry_point, verify_requirements=False)
entry_point.load.assert_called_once_with(require=False)
def oserror_generator(error_number, message='Ominous OSError message'):
def oserror_side_effect(*args, **kwargs):
if hasattr(oserror_side_effect, 'used'):
return
oserror_side_effect.used = True
raise OSError(error_number, message)
return oserror_side_effect
class TestStyleGuide(unittest.TestCase):
def setUp(self):
mocked_styleguide = mock.Mock(spec=engine.NoQAStyleGuide)
self.styleguide = engine.StyleGuide(styleguide=mocked_styleguide)
self.mocked_sg = mocked_styleguide
def test_proxies_excluded(self):
self.styleguide.excluded('file.py', parent='.')
self.mocked_sg.excluded.assert_called_once_with('file.py', parent='.')
def test_proxies_init_report(self):
reporter = object()
self.styleguide.init_report(reporter)
self.mocked_sg.init_report.assert_called_once_with(reporter)
def test_proxies_check_files(self):
self.styleguide.check_files(['foo', 'bar'])
self.mocked_sg.check_files.assert_called_once_with(
paths=['foo', 'bar']
)
def test_proxies_input_file(self):
self.styleguide.input_file('file.py',
lines=[9, 10],
expected='foo',
line_offset=20)
self.mocked_sg.input_file.assert_called_once_with(filename='file.py',
lines=[9, 10],
expected='foo',
line_offset=20)
def test_check_files_retries_on_specific_OSErrors(self):
self.mocked_sg.check_files.side_effect = oserror_generator(
errno.ENOSPC, 'No space left on device'
)
self.styleguide.check_files(['foo', 'bar'])
self.mocked_sg.init_report.assert_called_once_with(pep8.StandardReport)
def test_input_file_retries_on_specific_OSErrors(self):
self.mocked_sg.input_file.side_effect = oserror_generator(
errno.ENOSPC, 'No space left on device'
)
self.styleguide.input_file('file.py')
self.mocked_sg.init_report.assert_called_once_with(pep8.StandardReport)
def test_check_files_reraises_unknown_OSErrors(self):
self.mocked_sg.check_files.side_effect = oserror_generator(
errno.EADDRINUSE,
'lol why are we talking about binding to sockets'
)
self.assertRaises(OSError, self.styleguide.check_files,
['foo', 'bar'])
def test_input_file_reraises_unknown_OSErrors(self):
self.mocked_sg.input_file.side_effect = oserror_generator(
errno.EADDRINUSE,
'lol why are we talking about binding to sockets'
)
self.assertRaises(OSError, self.styleguide.input_file,
['foo', 'bar'])
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,59 @@
"""Module containing the tests for flake8.hooks."""
import os
import unittest
try:
from unittest import mock
except ImportError:
import mock
import flake8.hooks
from flake8.util import is_windows
def excluded(filename):
return filename.endswith('afile.py')
class TestGitHook(unittest.TestCase):
if is_windows:
# On Windows, absolute paths start with a drive letter, for example C:
# Here we build a fake absolute path starting with the current drive
# letter, for example C:\fake\temp
current_drive, ignore_tail = os.path.splitdrive(os.getcwd())
fake_abs_path = os.path.join(current_drive, os.path.sep, 'fake', 'tmp')
else:
fake_abs_path = os.path.join(os.path.sep, 'fake', 'tmp')
@mock.patch('os.makedirs')
@mock.patch('flake8.hooks.open', create=True)
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp', return_value=fake_abs_path)
@mock.patch('flake8.hooks.run',
return_value=(None,
[os.path.join('foo', 'afile.py'),
os.path.join('foo', 'bfile.py')],
None))
@mock.patch('flake8.hooks.get_style_guide')
def test_prepends_tmp_directory_to_exclude(self, get_style_guide, run,
*args):
style_guide = get_style_guide.return_value = mock.Mock()
style_guide.options.exclude = [os.path.join('foo', 'afile.py')]
style_guide.options.filename = [os.path.join('foo', '*')]
style_guide.excluded = excluded
flake8.hooks.git_hook()
dirname, filename = os.path.split(
os.path.abspath(os.path.join('foo', 'bfile.py')))
if is_windows:
# In Windows, the absolute path in dirname will start with a drive
# letter. Here, we discad the drive letter.
ignore_drive, dirname = os.path.splitdrive(dirname)
tmpdir = os.path.join(self.fake_abs_path, dirname[1:])
tmpfile = os.path.join(tmpdir, 'bfile.py')
style_guide.check_files.assert_called_once_with([tmpfile])
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,79 @@
from __future__ import with_statement
import os
import unittest
try:
from unittest import mock
except ImportError:
import mock # < PY33
from flake8 import engine
from flake8.util import is_windows
class IntegrationTestCase(unittest.TestCase):
"""Integration style tests to exercise different command line options."""
def this_file(self):
"""Return the real path of this file."""
this_file = os.path.realpath(__file__)
if this_file.endswith("pyc"):
this_file = this_file[:-1]
return this_file
def check_files(self, arglist=[], explicit_stdin=False, count=0):
"""Call check_files."""
if explicit_stdin:
target_file = "-"
else:
target_file = self.this_file()
argv = ['flake8'] + arglist + [target_file]
with mock.patch("sys.argv", argv):
style_guide = engine.get_style_guide(parse_argv=True)
report = style_guide.check_files()
self.assertEqual(report.total_errors, count)
return style_guide, report
def test_no_args(self):
# assert there are no reported errors
self.check_files()
def _job_tester(self, jobs):
# mock stdout.flush so we can count the number of jobs created
with mock.patch('sys.stdout.flush') as mocked:
guide, report = self.check_files(arglist=['--jobs=%s' % jobs])
if is_windows():
# The code path where guide.options.jobs gets converted to an
# int is not run on windows. So, do the int conversion here.
self.assertEqual(int(guide.options.jobs), jobs)
# On windows, call count is always zero.
self.assertEqual(mocked.call_count, 0)
else:
self.assertEqual(guide.options.jobs, jobs)
self.assertEqual(mocked.call_count, jobs)
def test_jobs(self):
self._job_tester(2)
self._job_tester(10)
def test_stdin(self):
self.count = 0
def fake_stdin():
self.count += 1
with open(self.this_file(), "r") as f:
return f.read()
with mock.patch("pep8.stdin_get_value", fake_stdin):
guide, report = self.check_files(arglist=['--jobs=4'],
explicit_stdin=True)
self.assertEqual(self.count, 1)
def test_stdin_fail(self):
def fake_stdin():
return "notathing\n"
with mock.patch("pep8.stdin_get_value", fake_stdin):
# only assert needed is in check_files
guide, report = self.check_files(arglist=['--jobs=4'],
explicit_stdin=True,
count=1)

View file

@ -0,0 +1,18 @@
from __future__ import with_statement
import unittest
import setuptools
from flake8 import main
class TestMain(unittest.TestCase):
def test_issue_39_regression(self):
distribution = setuptools.Distribution()
cmd = main.Flake8Command(distribution)
cmd.options_dict = {}
cmd.run()
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,73 @@
from __future__ import with_statement
import ast
import unittest
from collections import namedtuple
from flake8._pyflakes import FlakesChecker
Options = namedtuple("Options", ['builtins', 'doctests',
'include_in_doctest',
'exclude_from_doctest'])
class TestFlakesChecker(unittest.TestCase):
def setUp(self):
self.tree = ast.parse('print("cookies")')
def test_doctest_flag_enabled(self):
options = Options(builtins=None, doctests=True,
include_in_doctest='',
exclude_from_doctest='')
FlakesChecker.parse_options(options)
flake_checker = FlakesChecker(self.tree, 'cookies.txt')
assert flake_checker.withDoctest is True
def test_doctest_flag_disabled(self):
options = Options(builtins=None, doctests=False,
include_in_doctest='',
exclude_from_doctest='')
FlakesChecker.parse_options(options)
flake_checker = FlakesChecker(self.tree, 'cookies.txt')
assert flake_checker.withDoctest is False
def test_doctest_flag_enabled_exclude_file(self):
options = Options(builtins=None, doctests=True,
include_in_doctest='',
exclude_from_doctest='cookies.txt,'
'hungry/cookies.txt')
FlakesChecker.parse_options(options)
flake_checker = FlakesChecker(self.tree, './cookies.txt')
assert flake_checker.withDoctest is False
def test_doctest_flag_disabled_include_file(self):
options = Options(builtins=None, doctests=False,
include_in_doctest='./cookies.txt,cake_yuck.txt',
exclude_from_doctest='')
FlakesChecker.parse_options(options)
flake_checker = FlakesChecker(self.tree, './cookies.txt')
assert flake_checker.withDoctest is True
def test_doctest_flag_disabled_include_file_exclude_dir(self):
options = Options(builtins=None, doctests=False,
include_in_doctest='./cookies.txt',
exclude_from_doctest='./')
FlakesChecker.parse_options(options)
flake_checker = FlakesChecker(self.tree, './cookies.txt')
assert flake_checker.withDoctest is True
def test_doctest_flag_disabled_include_dir_exclude_file(self):
options = Options(builtins=None, doctests=False,
include_in_doctest='./',
exclude_from_doctest='./cookies.txt')
FlakesChecker.parse_options(options)
flake_checker = FlakesChecker(self.tree, './cookies.txt')
assert flake_checker.withDoctest is False
def test_doctest_flag_disabled_include_file_exclude_file_error(self):
options = Options(builtins=None, doctests=False,
include_in_doctest='./cookies.txt',
exclude_from_doctest='./cookies.txt,cake_yuck.txt')
self.assertRaises(ValueError, FlakesChecker.parse_options, options)

View file

@ -0,0 +1,36 @@
from __future__ import with_statement
import errno
import unittest
try:
from unittest import mock
except ImportError:
import mock # < PY33
from flake8 import reporter
def ioerror_report_factory(errno_code):
class IOErrorBaseQReport(reporter.BaseQReport):
def _process_main(self):
raise IOError(errno_code, 'Fake bad pipe exception')
options = mock.MagicMock()
options.jobs = 2
return IOErrorBaseQReport(options)
class TestBaseQReport(unittest.TestCase):
def test_does_not_raise_a_bad_pipe_ioerror(self):
"""Test that no EPIPE IOError exception is re-raised or leaked."""
report = ioerror_report_factory(errno.EPIPE)
try:
report.process_main()
except IOError:
self.fail('BaseQReport.process_main raised an IOError for EPIPE'
' but it should have caught this exception.')
def test_raises_a_enoent_ioerror(self):
"""Test that an ENOENT IOError exception is re-raised."""
report = ioerror_report_factory(errno.ENOENT)
self.assertRaises(IOError, report.process_main)

View file

@ -0,0 +1,120 @@
import optparse
import unittest
from flake8.util import option_normalizer
class TestOptionSerializerParsesTrue(unittest.TestCase):
def setUp(self):
self.option = optparse.Option('--foo', action='store_true')
self.option_name = 'fake_option'
def test_1_is_true(self):
value = option_normalizer('1', self.option, self.option_name)
self.assertTrue(value)
def test_T_is_true(self):
value = option_normalizer('T', self.option, self.option_name)
self.assertTrue(value)
def test_TRUE_is_true(self):
value = option_normalizer('TRUE', self.option, self.option_name)
self.assertTrue(value, True)
def test_ON_is_true(self):
value = option_normalizer('ON', self.option, self.option_name)
self.assertTrue(value)
def test_t_is_true(self):
value = option_normalizer('t', self.option, self.option_name)
self.assertTrue(value)
def test_true_is_true(self):
value = option_normalizer('true', self.option, self.option_name)
self.assertTrue(value)
def test_on_is_true(self):
value = option_normalizer('on', self.option, self.option_name)
self.assertTrue(value)
class TestOptionSerializerParsesFalse(unittest.TestCase):
def setUp(self):
self.option = optparse.Option('--foo', action='store_true')
self.option_name = 'fake_option'
def test_0_is_false(self):
value = option_normalizer('0', self.option, self.option_name)
self.assertFalse(value)
def test_F_is_false(self):
value = option_normalizer('F', self.option, self.option_name)
self.assertFalse(value)
def test_FALSE_is_false(self):
value = option_normalizer('FALSE', self.option, self.option_name)
self.assertFalse(value)
def test_OFF_is_false(self):
value = option_normalizer('OFF', self.option, self.option_name)
self.assertFalse(value)
def test_f_is_false(self):
value = option_normalizer('f', self.option, self.option_name)
self.assertFalse(value)
def test_false_is_false(self):
value = option_normalizer('false', self.option, self.option_name)
self.assertFalse(value)
def test_off_is_false(self):
value = option_normalizer('off', self.option, self.option_name)
self.assertFalse(value)
class TestOptionSerializerParsesLists(unittest.TestCase):
def setUp(self):
self.option = optparse.Option('--select')
self.option_name = 'select'
self.answer = ['F401', 'F402', 'F403', 'F404']
def test_parses_simple_comma_separated_lists(self):
value = option_normalizer('F401,F402,F403,F404', self.option,
self.option_name)
self.assertEqual(value, self.answer)
def test_parses_less_simple_comma_separated_lists(self):
value = option_normalizer('F401 ,F402 ,F403 ,F404', self.option,
self.option_name)
self.assertEqual(value, self.answer)
value = option_normalizer('F401, F402, F403, F404', self.option,
self.option_name)
self.assertEqual(value, self.answer)
def test_parses_comma_separated_lists_with_newlines(self):
value = option_normalizer('''\
F401,
F402,
F403,
F404,
''', self.option, self.option_name)
self.assertEqual(value, self.answer)
class TestOptionSerializerParsesInts(unittest.TestCase):
def setUp(self):
self.option = optparse.Option('--max-complexity', type='int')
self.option_name = 'max_complexity'
def test_parses_an_int(self):
value = option_normalizer('2', self.option, self.option_name)
self.assertEqual(value, 2)
if __name__ == '__main__':
unittest.main()

77
old/flake8/util.py Normal file
View file

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
import os
try:
import ast
iter_child_nodes = ast.iter_child_nodes
except ImportError: # Python 2.5
import _ast as ast
if 'decorator_list' not in ast.ClassDef._fields:
# Patch the missing attribute 'decorator_list'
ast.ClassDef.decorator_list = ()
ast.FunctionDef.decorator_list = property(lambda s: s.decorators)
def iter_child_nodes(node):
"""
Yield all direct child nodes of *node*, that is, all fields that
are nodes and all items of fields that are lists of nodes.
"""
if not node._fields:
return
for name in node._fields:
field = getattr(node, name, None)
if isinstance(field, ast.AST):
yield field
elif isinstance(field, list):
for item in field:
if isinstance(item, ast.AST):
yield item
class OrderedSet(list):
"""List without duplicates."""
__slots__ = ()
def add(self, value):
if value not in self:
self.append(value)
def is_windows():
"""Determine if the system is Windows."""
return os.name == 'nt'
def is_using_stdin(paths):
"""Determine if we're running checks on stdin."""
return '-' in paths
def warn_when_using_jobs(options):
return (options.verbose and options.jobs and options.jobs.isdigit() and
int(options.jobs) > 1)
def force_disable_jobs(styleguide):
return is_windows() or is_using_stdin(styleguide.paths)
INT_TYPES = ('int', 'count')
BOOL_TYPES = ('store_true', 'store_false')
LIST_OPTIONS = ('select', 'ignore', 'exclude', 'enable_extensions')
def option_normalizer(value, option, option_name):
if option.action in BOOL_TYPES:
if str(value).upper() in ('1', 'T', 'TRUE', 'ON'):
value = True
if str(value).upper() in ('0', 'F', 'FALSE', 'OFF'):
value = False
elif option.type in INT_TYPES:
value = int(value)
elif option_name in LIST_OPTIONS:
if isinstance(value, str):
value = [opt.strip() for opt in value.split(',') if opt.strip()]
return value

75
old/setup.py Normal file
View file

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
from __future__ import with_statement
from setuptools import setup
try:
# Work around a traceback with Nose on Python 2.6
# http://bugs.python.org/issue15881#msg170215
__import__('multiprocessing')
except ImportError:
pass
try:
# Use https://docs.python.org/3/library/unittest.mock.html
from unittest import mock
except ImportError:
# < Python 3.3
mock = None
tests_require = ['nose']
if mock is None:
tests_require += ['mock']
def get_version(fname='flake8/__init__.py'):
with open(fname) as f:
for line in f:
if line.startswith('__version__'):
return eval(line.split('=')[-1])
def get_long_description():
descr = []
for fname in ('README.rst', 'CHANGES.rst'):
with open(fname) as f:
descr.append(f.read())
return '\n\n'.join(descr)
setup(
name="flake8",
license="MIT",
version=get_version(),
description="the modular source code checker: pep8, pyflakes and co",
long_description=get_long_description(),
author="Tarek Ziade",
author_email="tarek@ziade.org",
maintainer="Ian Cordasco",
maintainer_email="graffatcolmingov@gmail.com",
url="https://gitlab.com/pycqa/flake8",
packages=["flake8", "flake8.tests"],
install_requires=[
"pyflakes >= 0.8.1, < 1.1",
"pep8 >= 1.5.7, != 1.6.0, != 1.6.1, != 1.6.2",
"mccabe >= 0.2.1, < 0.5",
],
entry_points={
'distutils.commands': ['flake8 = flake8.main:Flake8Command'],
'console_scripts': ['flake8 = flake8.main:main'],
'flake8.extension': [
'F = flake8._pyflakes:FlakesChecker',
],
},
classifiers=[
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 3",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Software Development :: Quality Assurance",
],
tests_require=tests_require,
test_suite='nose.collector',
)

40
old/tox.ini Normal file
View file

@ -0,0 +1,40 @@
[tox]
minversion = 1.6
envlist =
py26,py27,py33,py34,py27-flake8,py34-flake8
[testenv]
usedevelop = True
deps =
mock
nose
commands =
python setup.py test -q
python setup.py flake8
nosetests flake8.tests._test_warnings
[testenv:py27-flake8]
basepython = python2.7
deps =
flake8
commands = flake8 {posargs} flake8/
[testenv:py34-flake8]
basepython = python3.4
deps =
flake8
commands = flake8 {posargs} flake8/
[testenv:release]
basepython = python2.7
deps =
twine >= 1.5.0
wheel
commands =
python setup.py sdist bdist_wheel
twine upload --skip-existing {posargs} dist/*
[flake8]
select = E,F,W
max_line_length = 79
exclude = .git,.tox,dist,docs,*egg