Merge flint-merge branch

This commit is contained in:
Ian Cordasco 2013-02-22 11:37:53 -05:00
commit 50e3ce9c78
16 changed files with 555 additions and 1002 deletions

View file

@ -1,2 +1,2 @@
__version__ = '2.0'
__version__ = '2.0a1'

47
flake8/_pyflakes.py Normal file
View file

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
import pyflakes
import pyflakes.checker
def patch_pyflakes():
"""Add error codes to Pyflakes messages."""
codes = dict([line.split()[::-1] for line in (
'F401 UnusedImport',
'F402 ImportShadowedByLoopVar',
'F403 ImportStarUsed',
'F404 LateFutureImport',
'F810 Redefined', # XXX Obsolete?
'F811 RedefinedWhileUnused',
'F812 RedefinedInListComp',
'F821 UndefinedName',
'F822 UndefinedExport',
'F823 UndefinedLocal',
'F831 DuplicateArgument',
'F841 UnusedVariable',
)])
for name, obj in vars(pyflakes.messages).items():
if name[0].isupper() and obj.message:
obj.flake8_msg = '%s %s' % (codes.get(name, 'F999'), obj.message)
patch_pyflakes()
class FlakesChecker(pyflakes.checker.Checker):
"""Subclass the Pyflakes checker to conform with the flake8 API."""
name = 'pyflakes'
version = pyflakes.__version__
@classmethod
def add_options(cls, parser):
parser.add_option('--builtins',
help="define more built-ins, comma separated")
parser.config_options.append('builtins')
@classmethod
def parse_options(cls, options):
if options.builtins:
cls.builtIns = cls.builtIns.union(options.builtins.split(','))
def run(self):
for m in self.messages:
yield m.lineno, 0, (m.flake8_msg % m.message_args), m.__class__

76
flake8/engine.py Normal file
View file

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
import re
import pep8
from flake8 import __version__
from flake8.util import OrderedSet
_flake8_noqa = re.compile(r'flake8[:=]\s*noqa', re.I).search
def _register_extensions():
"""Register all the extensions."""
extensions = OrderedSet()
extensions.add(('pep8', pep8.__version__))
parser_hooks = []
options_hooks = []
try:
from pkg_resources import iter_entry_points
except ImportError:
pass
else:
for entry in iter_entry_points('flake8.extension'):
checker = entry.load()
pep8.register_check(checker, codes=[entry.name])
extensions.add((checker.name, checker.version))
if hasattr(checker, 'add_options'):
parser_hooks.append(checker.add_options)
if hasattr(checker, 'parse_options'):
options_hooks.append(checker.parse_options)
return extensions, parser_hooks, options_hooks
def get_parser():
(extensions, parser_hooks, options_hooks) = _register_extensions()
details = ', '.join(['%s: %s' % ext for ext in extensions])
parser = pep8.get_parser('flake8', '%s (%s)' % (__version__, details))
for opt in ('--repeat', '--testsuite', '--doctest'):
try:
parser.remove_option(opt)
except ValueError:
pass
parser.add_option('--exit-zero', action='store_true',
help="exit with code 0 even if there are errors")
for parser_hook in parser_hooks:
parser_hook(parser)
parser.add_option('--install-hook', default=False, action='store_true',
help='Install the appropriate hook for this '
'repository.', dest='install_hook')
return parser, options_hooks
class StyleGuide(pep8.StyleGuide):
# Backward compatibility pep8 <= 1.4.2
checker_class = pep8.Checker
def input_file(self, filename, lines=None, expected=None, line_offset=0):
"""Run all checks on a Python source file."""
if self.options.verbose:
print('checking %s' % filename)
fchecker = self.checker_class(
filename, lines=lines, options=self.options)
# Any "# flake8: noqa" line?
if any(_flake8_noqa(line) for line in fchecker.lines):
return 0
return fchecker.check_all(expected=expected, line_offset=line_offset)
def get_style_guide(**kwargs):
"""Parse the options and configure the checker."""
kwargs['parser'], options_hooks = get_parser()
styleguide = StyleGuide(**kwargs)
options = styleguide.options
for options_hook in options_hooks:
options_hook(options)
return styleguide

View file

@ -1,61 +1,58 @@
# -*- coding: utf-8 -*-
from __future__ import with_statement
import os
import sys
from flake8.util import (_initpep8, pep8style, skip_file, get_parser,
ConfigParser)
from subprocess import Popen, PIPE
try:
from configparser import ConfigParser
except ImportError: # Python 2
from ConfigParser import ConfigParser
from flake8.engine import get_parser, get_style_guide
from flake8.main import DEFAULT_CONFIG
def git_hook(complexity=-1, strict=False, ignore=None, lazy=False):
from flake8.main import check_file
_initpep8()
if ignore:
pep8style.options.ignore = ignore
warnings = 0
gitcmd = "git diff-index --cached --name-only HEAD"
if lazy:
gitcmd = gitcmd.replace('--cached ', '')
_, files_modified, _ = run(gitcmd)
for filename in files_modified:
ext = os.path.splitext(filename)[-1]
if ext != '.py':
continue
if not os.path.exists(filename):
continue
warnings += check_file(path=filename, ignore=ignore,
complexity=complexity)
flake8_style = get_style_guide(
config_file=DEFAULT_CONFIG, ignore=ignore, max_complexity=complexity)
report = flake8_style.check_files(files_modified)
if strict:
return warnings
return report.total_errors
return 0
def hg_hook(ui, repo, **kwargs):
from flake8.main import check_file
complexity = ui.config('flake8', 'complexity', default=-1)
config = ui.config('flake8', 'config', default=True)
_initpep8(config_file=config)
warnings = 0
for file_ in _get_files(repo, **kwargs):
warnings += check_file(file_, complexity)
strict = ui.configbool('flake8', 'strict', default=True)
config = ui.config('flake8', 'config', default=True)
if config is True:
config = DEFAULT_CONFIG
paths = _get_files(repo, **kwargs)
flake8_style = get_style_guide(
config_file=config, max_complexity=complexity)
report = flake8_style.check_files(paths)
if strict:
return warnings
return report.total_errors
return 0
def run(command):
p = Popen(command.split(), stdout=PIPE, stderr=PIPE)
p.wait()
return (p.returncode, [line.strip() for line in p.stdout.readlines()],
[line.strip() for line in p.stderr.readlines()])
(stdout, stderr) = p.communicate()
return (p.returncode, [line.strip() for line in stdout.splitlines()],
[line.strip() for line in stderr.splitlines()])
def _get_files(repo, **kwargs):
@ -66,11 +63,8 @@ def _get_files(repo, **kwargs):
if file_ in seen or not os.path.exists(file_):
continue
seen.add(file_)
if not file_.endswith('.py'):
continue
if skip_file(file_):
continue
yield file_
if file_.endswith('.py'):
yield file_
def find_vcs():

View file

@ -1,158 +1,79 @@
# -*- coding: utf-8 -*-
import os
import sys
import pep8
import pyflakes.api
import pyflakes.checker
import select
from flake8 import mccabe
from flake8.util import _initpep8, skip_file, get_parser, Flake8Reporter
pep8style = None
import setuptools
from flake8.engine import get_style_guide
if sys.platform.startswith('win'):
pep8.DEFAULT_CONFIG = os.path.expanduser(r'~\.flake8')
DEFAULT_CONFIG = os.path.expanduser(r'~\.flake8')
else:
pep8.DEFAULT_CONFIG = os.path.join(
DEFAULT_CONFIG = os.path.join(
os.getenv('XDG_CONFIG_HOME') or os.path.expanduser('~/.config'),
'flake8'
)
PROJECT_CONFIG = ['.flake8']
PROJECT_CONFIG.extend(pep8.PROJECT_CONFIG)
pep8.PROJECT_CONFIG = tuple(PROJECT_CONFIG)
def main():
global pep8style
# parse out our flags so pep8 doesn't get confused
parser = get_parser()
"""Parse options and run checks on Python source."""
# Prepare
flake8_style = get_style_guide(parse_argv=True, config_file=DEFAULT_CONFIG)
options = flake8_style.options
# We then have to re-parse argv to make sure pep8 is properly initialized
pep8style = pep8.StyleGuide(parse_argv=True, config_file=True,
parser=parser)
opts = pep8style.options
if opts.install_hook:
if options.install_hook:
from flake8.hooks import install_hook
install_hook()
warnings = 0
stdin = None
# Run the checkers
report = flake8_style.check_files()
complexity = opts.max_complexity
builtins = set(opts.builtins.split(','))
if builtins:
orig_builtins = set(pyflakes.checker._MAGIC_GLOBALS)
pyflakes.checker._MAGIC_GLOBALS = orig_builtins | builtins
# This is needed so we can ignore some items
pyflakes_reporter = Flake8Reporter(opts.ignore)
if pep8style.paths and opts.filename is not None:
for path in _get_python_files(pep8style.paths):
if path == '-':
if stdin is None:
stdin = read_stdin()
warnings += check_code(stdin, opts.ignore, complexity)
else:
warnings += check_file(path, opts.ignore, complexity,
pyflakes_reporter)
else:
stdin = read_stdin()
warnings += check_code(stdin, opts.ignore, complexity,
pyflakes_reporter)
if opts.exit_zero:
raise SystemExit(0)
raise SystemExit(warnings > 0)
# Print the final report
options = flake8_style.options
if options.statistics:
report.print_statistics()
if options.benchmark:
report.print_benchmark()
if report.total_errors:
if options.count:
sys.stderr.write(str(report.total_errors) + '\n')
if not options.exit_zero:
raise SystemExit(1)
def check_file(path, ignore=(), complexity=-1, reporter=None):
if pep8style.excluded(path):
return 0
warnings = pyflakes.api.checkPath(path, reporter)
warnings -= reporter.ignored_warnings
warnings += pep8style.input_file(path)
if complexity > -1:
warnings += mccabe.get_module_complexity(path, complexity)
return warnings
def check_file(path, ignore=(), complexity=-1):
flake8_style = get_style_guide(
config_file=DEFAULT_CONFIG, ignore=ignore, max_complexity=complexity)
return flake8_style.input_file(path)
def check_code(code, ignore=(), complexity=-1, reporter=None):
warnings = pyflakes.api.check(code, '<stdin>', reporter)
warnings -= reporter.ignored_warnings
warnings += pep8style.input_file('-', lines=code.split('\n'))
if complexity > -1:
warnings += mccabe.get_code_complexity(code, complexity)
return warnings
def check_code(code, ignore=(), complexity=-1):
flake8_style = get_style_guide(
config_file=DEFAULT_CONFIG, ignore=ignore, max_complexity=complexity)
return flake8_style.input_file('-', lines=code.splitlines())
def read_stdin():
# wait for 1 second on the stdin fd
reads, __, __ = select.select([sys.stdin], [], [], 1.)
if reads == []:
print('input not specified')
raise SystemExit(1)
class Flake8Command(setuptools.Command):
description = "Run flake8 on modules registered in setuptools"
user_options = []
return sys.stdin.read()
def initialize_options(self):
pass
def finalize_options(self):
pass
try:
from setuptools import Command
except ImportError:
Flake8Command = None
else:
class Flake8Command(Command):
description = "Run flake8 on modules registered in setuptools"
user_options = []
def distribution_files(self):
if self.distribution.packages:
for package in self.distribution.packages:
yield package.replace(".", os.path.sep)
def initialize_options(self):
pass
if self.distribution.py_modules:
for filename in self.distribution.py_modules:
yield "%s.py" % filename
def finalize_options(self):
pass
def distribution_files(self):
if self.distribution.packages:
for package in self.distribution.packages:
yield package.replace(".", os.path.sep)
if self.distribution.py_modules:
for filename in self.distribution.py_modules:
yield "%s.py" % filename
def run(self):
_initpep8()
# _get_python_files can produce the same file several
# times, if one of its paths is a parent of another. Keep
# a set of checked files to de-duplicate.
checked = set()
warnings = 0
for path in _get_python_files(self.distribution_files()):
if path not in checked:
warnings += check_file(path)
checked.add(path)
raise SystemExit(warnings > 0)
def _get_python_files(paths):
for path in paths:
if os.path.isdir(path):
for dirpath, dirnames, filenames in os.walk(path):
if pep8style.excluded(dirpath):
dirnames[:] = []
continue
for filename in filenames:
if not filename.endswith('.py'):
continue
fullpath = os.path.join(dirpath, filename)
if not skip_file(fullpath) or pep8style.excluded(fullpath):
yield fullpath
else:
if not skip_file(path) or pep8style.excluded(path):
yield path
def run(self):
flake8_style = get_style_guide(config_file=DEFAULT_CONFIG)
paths = self.distribution_files()
report = flake8_style.check_files(paths)
raise SystemExit(report.total_errors > 0)

View file

@ -1,299 +0,0 @@
""" Meager code path measurement tool.
Ned Batchelder
http://nedbatchelder.com/blog/200803/python_code_complexity_microtool.html
MIT License.
"""
try:
from compiler import parse # NOQA
iter_child_nodes = None # NOQA
except ImportError:
from ast import parse, iter_child_nodes # NOQA
import optparse
import sys
from flake8.util import skip_warning
from collections import defaultdict
WARNING_CODE = "W901"
class ASTVisitor:
VERBOSE = 0
def __init__(self):
self.node = None
self._cache = {}
def default(self, node, *args):
if hasattr(node, 'getChildNodes'):
children = node.getChildNodes()
else:
children = iter_child_nodes(node)
for child in children:
self.dispatch(child, *args)
def dispatch(self, node, *args):
self.node = node
klass = node.__class__
meth = self._cache.get(klass)
if meth is None:
className = klass.__name__
meth = getattr(self.visitor, 'visit' + className, self.default)
self._cache[klass] = meth
return meth(node, *args)
def preorder(self, tree, visitor, *args):
"""Do preorder walk of tree using visitor"""
self.visitor = visitor
visitor.visit = self.dispatch
self.dispatch(tree, *args) # XXX *args make sense?
class PathNode:
def __init__(self, name, look="circle"):
self.name = name
self.look = look
def to_dot(self):
print('node [shape=%s,label="%s"] %d;' % (
self.look, self.name, self.dot_id()))
def dot_id(self):
return id(self)
class PathGraph:
def __init__(self, name, entity, lineno):
self.name = name
self.entity = entity
self.lineno = lineno
self.nodes = defaultdict(list)
def connect(self, n1, n2):
self.nodes[n1].append(n2)
def to_dot(self):
print('subgraph {')
for node in self.nodes:
node.to_dot()
for node, nexts in self.nodes.items():
for next in nexts:
print('%s -- %s;' % (node.dot_id(), next.dot_id()))
print('}')
def complexity(self):
""" Return the McCabe complexity for the graph.
V-E+2
"""
num_edges = sum([len(n) for n in self.nodes.values()])
num_nodes = len(self.nodes)
return num_edges - num_nodes + 2
class PathGraphingAstVisitor(ASTVisitor):
""" A visitor for a parsed Abstract Syntax Tree which finds executable
statements.
"""
def __init__(self):
ASTVisitor.__init__(self)
self.classname = ""
self.graphs = {}
self.reset()
def reset(self):
self.graph = None
self.tail = None
def visitFunction(self, node):
if self.classname:
entity = '%s%s' % (self.classname, node.name)
else:
entity = node.name
name = '%d:1: %r' % (node.lineno, entity)
if self.graph is not None:
# closure
pathnode = self.appendPathNode(name)
self.tail = pathnode
self.default(node)
bottom = PathNode("", look='point')
self.graph.connect(self.tail, bottom)
self.graph.connect(pathnode, bottom)
self.tail = bottom
else:
self.graph = PathGraph(name, entity, node.lineno)
pathnode = PathNode(name)
self.tail = pathnode
self.default(node)
self.graphs["%s%s" % (self.classname, node.name)] = self.graph
self.reset()
visitFunctionDef = visitFunction
def visitClass(self, node):
old_classname = self.classname
self.classname += node.name + "."
self.default(node)
self.classname = old_classname
def appendPathNode(self, name):
if not self.tail:
return
pathnode = PathNode(name)
self.graph.connect(self.tail, pathnode)
self.tail = pathnode
return pathnode
def visitSimpleStatement(self, node):
if node.lineno is None:
lineno = 0
else:
lineno = node.lineno
name = "Stmt %d" % lineno
self.appendPathNode(name)
visitAssert = visitAssign = visitAssTuple = visitPrint = \
visitPrintnl = visitRaise = visitSubscript = visitDecorators = \
visitPass = visitDiscard = visitGlobal = visitReturn = \
visitSimpleStatement
def visitLoop(self, node):
name = "Loop %d" % node.lineno
if self.graph is None:
# global loop
self.graph = PathGraph(name, name, node.lineno)
pathnode = PathNode(name)
self.tail = pathnode
self.default(node)
self.graphs["%s%s" % (self.classname, name)] = self.graph
self.reset()
else:
pathnode = self.appendPathNode(name)
self.tail = pathnode
self.default(node.body)
bottom = PathNode("", look='point')
self.graph.connect(self.tail, bottom)
self.graph.connect(pathnode, bottom)
self.tail = bottom
# TODO: else clause in node.else_
visitFor = visitWhile = visitLoop
def visitIf(self, node):
name = "If %d" % node.lineno
pathnode = self.appendPathNode(name)
if not pathnode:
return # TODO: figure out what to do with if's outside def's.
loose_ends = []
for t, n in node.tests:
self.tail = pathnode
self.default(n)
loose_ends.append(self.tail)
if node.else_:
self.tail = pathnode
self.default(node.else_)
loose_ends.append(self.tail)
else:
loose_ends.append(pathnode)
bottom = PathNode("", look='point')
for le in loose_ends:
self.graph.connect(le, bottom)
self.tail = bottom
# TODO: visitTryExcept
# TODO: visitTryFinally
# TODO: visitWith
# XXX todo: determine which ones can add to the complexity
# py2
# TODO: visitStmt
# TODO: visitAssName
# TODO: visitCallFunc
# TODO: visitConst
# py3
# TODO: visitStore
# TODO: visitCall
# TODO: visitLoad
# TODO: visitNum
# TODO: visitarguments
# TODO: visitExpr
def get_code_complexity(code, min=7, filename='stdin'):
complex = []
try:
ast = parse(code)
except (AttributeError, SyntaxError):
e = sys.exc_info()[1]
sys.stderr.write("Unable to parse %s: %s\n" % (filename, e))
return 0
visitor = PathGraphingAstVisitor()
visitor.preorder(ast, visitor)
for graph in visitor.graphs.values():
if graph is None:
# ?
continue
if graph.complexity() >= min:
graph.filename = filename
if not skip_warning(graph):
msg = '%s:%d:1: %s %r is too complex (%d)' % (
filename,
graph.lineno,
WARNING_CODE,
graph.entity,
graph.complexity(),
)
complex.append(msg)
if len(complex) == 0:
return 0
print('\n'.join(complex))
return len(complex)
def get_module_complexity(module_path, min=7):
"""Returns the complexity of a module"""
code = open(module_path, "rU").read() + '\n\n'
return get_code_complexity(code, min, filename=module_path)
def main(argv):
opar = optparse.OptionParser()
opar.add_option("-d", "--dot", dest="dot",
help="output a graphviz dot file", action="store_true")
opar.add_option("-m", "--min", dest="min",
help="minimum complexity for output", type="int",
default=2)
options, args = opar.parse_args(argv)
text = open(args[0], "rU").read() + '\n\n'
ast = parse(text)
visitor = PathGraphingAstVisitor()
visitor.preorder(ast, visitor)
if options.dot:
print('graph {')
for graph in visitor.graphs.values():
if graph.complexity() >= options.min:
graph.to_dot()
print('}')
else:
for graph in visitor.graphs.values():
if graph.complexity() >= options.min:
print(graph.name, graph.complexity())
if __name__ == '__main__':
main(sys.argv[1:])

View file

@ -2,10 +2,9 @@
"""
Implementation of the command-line I{flake8} tool.
"""
from flake8.hooks import git_hook, hg_hook # noqa
from flake8.hooks import git_hook, hg_hook # noqa
from flake8.main import check_code, check_file, Flake8Command # noqa
from flake8.main import main
from flake8.main import (check_code, check_file, get_parser, # noqa
Flake8Command) # noqa
if __name__ == '__main__':

View file

@ -1,8 +1,30 @@
# -*- coding: utf-8 -*-
import sys
from unittest import TestCase
from pyflakes.api import check
code = """
class FlakesTestReporter(object):
def __init__(self):
self.messages = []
self.flakes = self.messages.append
def unexpectedError(self, filename, msg):
self.flakes('[unexpectedError] %s: %s' % (filename, msg))
def syntaxError(self, filename, msg, lineno, offset, text):
self.flakes('[syntaxError] %s:%d: %s' % (filename, lineno, msg))
code0 = """
try:
pass
except ValueError, err:
print(err)
"""
code1 = """
try:
pass
except ValueError as err:
@ -48,14 +70,25 @@ except foo.SomeException:
class TestFlake(TestCase):
def test_exception(self):
for c in (code, code2, code3):
warnings = check(code, '(stdin)')
codes = [code1, code2, code3]
if sys.version_info < (2, 6):
codes[0] = code0
elif sys.version_info < (3,):
codes.insert(0, code0)
for code in codes:
reporter = FlakesTestReporter()
warnings = check(code, '(stdin)', reporter)
self.assertFalse(reporter.messages)
self.assertEqual(warnings, 0)
def test_from_import_exception_in_scope(self):
warnings = check(code_from_import_exception, '(stdin)')
reporter = FlakesTestReporter()
warnings = check(code_from_import_exception, '(stdin)', reporter)
self.assertFalse(reporter.messages)
self.assertEqual(warnings, 0)
def test_import_exception_in_scope(self):
warnings = check(code_import_exception, '(stdin)')
reporter = FlakesTestReporter()
warnings = check(code_import_exception, '(stdin)', reporter)
self.assertFalse(reporter.messages)
self.assertEqual(warnings, 0)

View file

@ -1,41 +0,0 @@
import unittest
import sys
try:
from StringIO import StringIO
except ImportError:
from io import StringIO # NOQA
from flake8.mccabe import get_code_complexity
_GLOBAL = """\
for i in range(10):
pass
def a():
def b():
def c():
pass
c()
b()
"""
class McCabeTest(unittest.TestCase):
def setUp(self):
self.old = sys.stdout
self.out = sys.stdout = StringIO()
def tearDown(self):
sys.sdtout = self.old
def test_sample(self):
self.assertEqual(get_code_complexity(_GLOBAL, 1), 2)
self.out.seek(0)
res = self.out.read().strip().split('\n')
wanted = ["stdin:5:1: W901 'a' is too complex (4)",
"stdin:2:1: W901 'Loop 2' is too complex (2)"]
self.assertEqual(res, wanted)

View file

@ -1,161 +1,37 @@
from __future__ import with_statement
import re
import os
import sys
from io import StringIO
import pep8
import pyflakes
from pyflakes import reporter, messages
# -*- coding: utf-8 -*-
try:
# Python 2
from ConfigParser import ConfigParser
except ImportError:
# Python 3
from configparser import ConfigParser
import ast
iter_child_nodes = ast.iter_child_nodes
except ImportError: # Python 2.5
import _ast as ast
pep8style = None
if 'decorator_list' not in ast.ClassDef._fields:
# Patch the missing attribute 'decorator_list'
ast.ClassDef.decorator_list = ()
ast.FunctionDef.decorator_list = property(lambda s: s.decorators)
def get_parser():
"""Create a custom OptionParser"""
from flake8 import __version__
parser = pep8.get_parser()
def version(option, opt, value, parser):
parser.print_usage()
parser.print_version()
sys.exit(0)
parser.version = '{0} (pep8: {1}, pyflakes: {2})'.format(
__version__, pep8.__version__, pyflakes.__version__)
parser.remove_option('--version')
parser.add_option('--builtins', default='', dest='builtins',
help="append builtin functions to pyflakes' "
"_MAGIC_BUILTINS")
parser.add_option('--exit-zero', action='store_true', default=False,
help='Exit with status 0 even if there are errors')
parser.add_option('--max-complexity', default=-1, action='store',
type='int', help='McCabe complexity threshold')
parser.add_option('--install-hook', default=False, action='store_true',
help='Install the appropriate hook for this '
'repository.', dest='install_hook')
# don't overlap with pep8's verbose option
parser.add_option('-V', '--version', action='callback',
callback=version,
help='Print the version info for flake8')
parser.prog = os.path.basename(sys.argv[0])
return parser
def skip_warning(warning, ignore=[]):
# XXX quick dirty hack, just need to keep the line in the warning
if not hasattr(warning, 'message') or ignore is None:
# McCabe's warnings cannot be skipped afaik, and they're all strings.
# And we'll get a TypeError otherwise
return False
if warning.message.split()[0] in ignore:
return True
if not os.path.isfile(warning.filename):
return False
# XXX should cache the file in memory
with open(warning.filename) as f:
line = f.readlines()[warning.lineno - 1]
return skip_line(line)
def skip_line(line):
def _noqa(line):
return line.strip().lower().endswith('# noqa')
skip = _noqa(line)
if not skip:
i = line.rfind(' #')
skip = _noqa(line[:i]) if i > 0 else False
return skip
_NOQA = re.compile(r'flake8[:=]\s*noqa', re.I | re.M)
def skip_file(path, source=None):
"""Returns True if this header is found in path
# flake8: noqa
"""
if os.path.isfile(path):
f = open(path)
elif source:
f = StringIO(source)
else:
return False
try:
content = f.read()
finally:
f.close()
return _NOQA.search(content) is not None
def _initpep8(config_file=True):
# default pep8 setup
global pep8style
import pep8
if pep8style is None:
pep8style = pep8.StyleGuide(config_file=config_file)
pep8style.options.physical_checks = pep8.find_checks('physical_line')
pep8style.options.logical_checks = pep8.find_checks('logical_line')
pep8style.options.counters = dict.fromkeys(pep8.BENCHMARK_KEYS, 0)
pep8style.options.messages = {}
if not pep8style.options.max_line_length:
pep8style.options.max_line_length = 79
pep8style.args = []
return pep8style
error_mapping = {
'W402': (messages.UnusedImport,),
'W403': (messages.ImportShadowedByLoopVar,),
'W404': (messages.ImportStarUsed,),
'W405': (messages.LateFutureImport,),
'W801': (messages.RedefinedWhileUnused,
messages.RedefinedInListComp,),
'W802': (messages.UndefinedName,),
'W803': (messages.UndefinedExport,),
'W804': (messages.UndefinedLocal,
messages.UnusedVariable,),
'W805': (messages.DuplicateArgument,),
'W806': (messages.Redefined,),
}
class Flake8Reporter(reporter.Reporter):
"""Our own instance of a Reporter so that we can silence some messages."""
class_mapping = dict((k, c) for (c, v) in error_mapping.items() for k in v)
def __init__(self, ignore=None):
super(Flake8Reporter, self).__init__(sys.stdout, sys.stderr)
self.ignore = ignore or []
self.ignored_warnings = 0
def flake(self, message):
classes = [error_mapping[i] for i in self.ignore if i in error_mapping]
if (any(isinstance(message, c) for c in classes) or
skip_warning(message)):
self.ignored_warnings += 1
def iter_child_nodes(node):
"""
Yield all direct child nodes of *node*, that is, all fields that
are nodes and all items of fields that are lists of nodes.
"""
if not node._fields:
return
m = self.to_str(message)
i = m.rfind(':') + 1
message = '{0} {1}{2}'.format(
m[:i], self.class_mapping[message.__class__], m[i:]
)
for name in node._fields:
field = getattr(node, name, None)
if isinstance(field, ast.AST):
yield field
elif isinstance(field, list):
for item in field:
if isinstance(item, ast.AST):
yield item
super(Flake8Reporter, self).flake(message)
def to_str(self, message):
try:
return unicode(message)
except NameError:
return str(message)
class OrderedSet(list):
"""List without duplicates."""
__slots__ = ()
def add(self, value):
if value not in self:
self.append(value)