Merged refactor branch. We're nearing 2.0

This commit is contained in:
Ian Cordasco 2013-01-22 09:30:33 -05:00
commit 8f5bb5dc0d
13 changed files with 414 additions and 3086 deletions

View file

@ -6,3 +6,6 @@ bin
flake8.egg-info
man
\.Python
nose*
.*\.swp
.*.orig

View file

@ -151,7 +151,7 @@ files listed in your ``py_modules`` and ``packages``. If any warnings
are found, the command will exit with an error code::
$ python setup.py flake8
Original projects
@ -160,8 +160,9 @@ Original projects
Flake8 is just a glue project, all the merits go to the creators of the original
projects:
- pep8: http://github.com/jcrocholl/pep8/
- pep8: https://github.com/jcrocholl/pep8/
- PyFlakes: http://divmod.org/trac/wiki/DivmodPyflakes
- flakey: https://bitbucket.org/icordasc/flakey
- McCabe: http://nedbatchelder.com/blog/200803/python_code_complexity_microtool.html
Warning / Error codes
@ -207,7 +208,7 @@ pep8:
- W603: '<>' is deprecated, use '!='
- W604: backticks are deprecated, use 'repr()'
pyflakes:
flakey:
- W402: <module> imported but unused
- W403: import <module> from line <n> shadowed by loop variable
@ -228,6 +229,15 @@ McCabe:
CHANGES
=======
2.0.0 - 2013-01-xx
------------------
- Fixes #13: pep8 and flakey are now external dependencies
- Split run.py into main.py and hooks.py for better logic
- Expose our parser for our users
- New feature: Install git and hg hooks automagically
- By relying on flakey, we also fixed #45 and #35
1.7.0 - 2012-12-21
------------------

View file

@ -1,3 +1,2 @@
#
__version__ = '1.7.0'
__version__ = '2.0'

145
flake8/hooks.py Normal file
View file

@ -0,0 +1,145 @@
import os
import sys
from flake8.util import (_initpep8, pep8style, skip_file, get_parser,
ConfigParser)
from subprocess import Popen, PIPE
def git_hook(complexity=-1, strict=False, ignore=None, lazy=False):
from flake8.main import check_file
_initpep8()
if ignore:
pep8style.options.ignore = ignore
warnings = 0
gitcmd = "git diff-index --cached --name-only HEAD"
if lazy:
gitcmd = gitcmd.replace('--cached ', '')
_, files_modified, _ = run(gitcmd)
for filename in files_modified:
ext = os.path.splitext(filename)[-1]
if ext != '.py':
continue
if not os.path.exists(filename):
continue
warnings += check_file(path=filename, ignore=ignore,
complexity=complexity)
if strict:
return warnings
return 0
def hg_hook(ui, repo, **kwargs):
from flake8.main import check_file
complexity = ui.config('flake8', 'complexity', default=-1)
config = ui.config('flake8', 'config', default=True)
_initpep8(config_file=config)
warnings = 0
for file_ in _get_files(repo, **kwargs):
warnings += check_file(file_, complexity)
strict = ui.configbool('flake8', 'strict', default=True)
if strict:
return warnings
return 0
def run(command):
p = Popen(command.split(), stdout=PIPE, stderr=PIPE)
p.wait()
return (p.returncode, [line.strip() for line in p.stdout.readlines()],
[line.strip() for line in p.stderr.readlines()])
def _get_files(repo, **kwargs):
seen = set()
for rev in range(repo[kwargs['node']], len(repo)):
for file_ in repo[rev].files():
file_ = os.path.join(repo.root, file_)
if file_ in seen or not os.path.exists(file_):
continue
seen.add(file_)
if not file_.endswith('.py'):
continue
if skip_file(file_):
continue
yield file_
def find_vcs():
if os.path.isdir('.git'):
if not os.path.isdir('.git/hooks'):
os.mkdir('.git/hooks')
return '.git/hooks/pre-commit'
elif os.path.isdir('.hg'):
return '.hg/hgrc'
return ''
git_hook_file = """#!/usr/bin/env python
import sys
import os
from flake8.hooks import git_hook
COMPLEXITY = os.getenv('FLAKE8_COMPLEXITY', 10)
STRICT = os.getenv('FLAKE8_STRICT', False)
if __name__ == '__main__':
sys.exit(git_hook(complexity=COMPLEXITY, strict=STRICT))
"""
def _install_hg_hook(path):
c = ConfigParser()
c.readfp(open(path, 'r'))
if not c.has_section('hooks'):
c.add_section('hooks')
if not c.has_option('hooks', 'commit'):
c.set('hooks', 'commit', 'python:flake8.hooks.hg_hook')
if not c.has_option('hooks', 'qrefresh'):
c.set('hooks', 'qrefresh', 'python:flake8.hooks.hg_hook')
if not c.has_section('flake8'):
c.add_section('flake8')
if not c.has_option('flake8', 'complexity'):
c.set('flake8', 'complexity', str(os.getenv('FLAKE8_COMPLEXITY', 10)))
if not c.has_option('flake8', 'strict'):
c.set('flake8', 'strict', os.getenv('FLAKE8_STRICT', False))
c.write(open(path, 'w+'))
def install_hook():
vcs = find_vcs()
if not vcs:
p = get_parser()
sys.stderr.write('Error: could not find either a git or mercurial '
'directory. Please re-run this in a proper '
'repository.')
p.print_help()
sys.exit(1)
status = 0
if 'git' in vcs:
with open(vcs, 'w+') as fd:
fd.write(git_hook_file)
os.chmod(vcs, 744)
elif 'hg' in vcs:
_install_hg_hook(vcs)
else:
status = 1
sys.exit(status)

155
flake8/main.py Normal file
View file

@ -0,0 +1,155 @@
import os
import sys
import pep8
import flakey
import select
from flake8 import mccabe
from flake8.util import _initpep8, skip_file, get_parser
pep8style = None
def main():
global pep8style
# parse out our flags so pep8 doesn't get confused
parser = get_parser()
opts, _ = parser.parse_args()
if opts.install_hook:
from flake8.hooks import install_hook
install_hook()
if opts.builtins:
s = '--builtins={0}'.format(opts.builtins)
sys.argv.remove(s)
if opts.exit_zero:
sys.argv.remove('--exit-zero')
if opts.install_hook:
sys.argv.remove('--install-hook')
complexity = opts.max_complexity
if complexity > 0:
sys.argv.remove('--max-complexity={0}'.format(complexity))
# make sure pep8 gets the information it expects
sys.argv.pop(0)
sys.argv.insert(0, 'pep8')
pep8style = pep8.StyleGuide(parse_argv=True, config_file=True)
options = pep8style.options
warnings = 0
stdin = None
builtins = set(opts.builtins.split(','))
if builtins:
orig_builtins = set(flakey.checker._MAGIC_GLOBALS)
flakey.checker._MAGIC_GLOBALS = orig_builtins | builtins
if pep8style.paths and options.filename is not None:
for path in _get_python_files(pep8style.paths):
if path == '-':
if stdin is None:
stdin = read_stdin()
warnings += check_code(stdin, opts.ignore, complexity)
else:
warnings += check_file(path, opts.ignore, complexity)
else:
stdin = read_stdin()
warnings += check_code(stdin, opts.ignore, complexity)
if opts.exit_zero:
raise SystemExit(0)
raise SystemExit(warnings)
def check_file(path, ignore=(), complexity=-1):
if pep8style.excluded(path):
return 0
warning = flakey.checkPath(path)
warnings = flakey.print_messages(warning, ignore=ignore)
warnings += pep8style.input_file(path)
if complexity > -1:
warnings += mccabe.get_module_complexity(path, complexity)
return warnings
def check_code(code, ignore=(), complexity=-1):
warning = flakey.check(code, '<stdin>')
warnings = flakey.print_messages(warning, ignore=ignore, code=code)
warnings += pep8style.input_file('-', lines=code.split('\n'))
if complexity > -1:
warnings += mccabe.get_code_complexity(code, complexity)
return warnings
def read_stdin():
# wait for 1 second on the stdin fd
reads, __, __ = select.select([sys.stdin], [], [], 1.)
if reads == []:
print('input not specified')
raise SystemExit(1)
return sys.stdin.read()
try:
from setuptools import Command
except ImportError:
Flake8Command = None
else:
class Flake8Command(Command):
description = "Run flake8 on modules registered in setuptools"
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def distribution_files(self):
if self.distribution.packages:
for package in self.distribution.packages:
yield package.replace(".", os.path.sep)
if self.distribution.py_modules:
for filename in self.distribution.py_modules:
yield "%s.py" % filename
def run(self):
_initpep8()
# _get_python_files can produce the same file several
# times, if one of its paths is a parent of another. Keep
# a set of checked files to de-duplicate.
checked = set()
warnings = 0
for path in _get_python_files(self.distribution_files()):
if path not in checked:
warnings += check_file(path)
checked.add(path)
raise SystemExit(warnings > 0)
def _get_python_files(paths):
for path in paths:
if os.path.isdir(path):
for dirpath, dirnames, filenames in os.walk(path):
if pep8style.excluded(dirpath):
dirnames[:] = []
continue
for filename in filenames:
if not filename.endswith('.py'):
continue
fullpath = os.path.join(dirpath, filename)
if not skip_file(fullpath) or pep8style.excluded(fullpath):
yield fullpath
else:
if not skip_file(path) or pep8style.excluded(path):
yield path

View file

@ -58,8 +58,8 @@ class PathNode:
self.look = look
def to_dot(self):
print('node [shape=%s,label="%s"] %d;' %
(self.look, self.name, self.dot_id()))
print('node [shape=%s,label="%s"] %d;' % (
self.look, self.name, self.dot_id()))
def dot_id(self):
return id(self)

View file

@ -1,113 +0,0 @@
# (c) 2005 Divmod, Inc. See LICENSE file for details
class Message(object):
message = ''
message_args = ()
def __init__(self, filename, lineno):
self.filename = filename
self.lineno = lineno
def __str__(self):
return '%s:%s: %s' % (self.filename, self.lineno,
self.message % self.message_args)
def __lt__(self, other):
if self.filename != other.filename:
return self.filename < other.filename
return self.lineno < other.lineno
class UnusedImport(Message):
message = 'W402 %r imported but unused'
def __init__(self, filename, lineno, name):
Message.__init__(self, filename, lineno)
self.message_args = (name,)
class RedefinedWhileUnused(Message):
message = 'W801 redefinition of unused %r from line %r'
def __init__(self, filename, lineno, name, orig_lineno):
Message.__init__(self, filename, lineno)
self.message_args = (name, orig_lineno)
class ImportShadowedByLoopVar(Message):
message = 'W403 import %r from line %r shadowed by loop variable'
def __init__(self, filename, lineno, name, orig_lineno):
Message.__init__(self, filename, lineno)
self.message_args = (name, orig_lineno)
class ImportStarUsed(Message):
message = "W404 'from %s import *' used; unable to detect undefined names"
def __init__(self, filename, lineno, modname):
Message.__init__(self, filename, lineno)
self.message_args = (modname,)
class UndefinedName(Message):
message = 'W802 undefined name %r'
def __init__(self, filename, lineno, name):
Message.__init__(self, filename, lineno)
self.message_args = (name,)
class UndefinedExport(Message):
message = 'W803 undefined name %r in __all__'
def __init__(self, filename, lineno, name):
Message.__init__(self, filename, lineno)
self.message_args = (name,)
class UndefinedLocal(Message):
message = "W804 local variable %r (defined in enclosing scope on line " \
"%r) referenced before assignment"
def __init__(self, filename, lineno, name, orig_lineno):
Message.__init__(self, filename, lineno)
self.message_args = (name, orig_lineno)
class DuplicateArgument(Message):
message = 'W805 duplicate argument %r in function definition'
def __init__(self, filename, lineno, name):
Message.__init__(self, filename, lineno)
self.message_args = (name,)
class RedefinedFunction(Message):
message = 'W806 redefinition of function %r from line %r'
def __init__(self, filename, lineno, name, orig_lineno):
Message.__init__(self, filename, lineno)
self.message_args = (name, orig_lineno)
class LateFutureImport(Message):
message = 'W405 future import(s) %r after other statements'
def __init__(self, filename, lineno, names):
Message.__init__(self, filename, lineno)
self.message_args = (names,)
class UnusedVariable(Message):
"""
Indicates that a variable has been explicity assigned to but not actually
used.
"""
message = 'W806 local variable %r is assigned to but never used'
def __init__(self, filename, lineno, names):
Message.__init__(self, filename, lineno)
self.message_args = (names,)

File diff suppressed because it is too large Load diff

View file

@ -1,723 +0,0 @@
# -*- test-case-name: pyflakes -*-
# (c) 2005-2010 Divmod, Inc.
# See LICENSE file for details
try:
import __builtin__ # NOQA
except ImportError:
import builtins as __builtin__ # NOQA
import os.path
import _ast
import sys
from flake8 import messages
from flake8.util import skip_warning
__version__ = '0.5.0'
# utility function to iterate over an AST node's children, adapted
# from Python 2.6's standard ast module
try:
import ast
iter_child_nodes = ast.iter_child_nodes
except (ImportError, AttributeError):
def iter_child_nodes(node, astcls=_ast.AST):
"""
Yield all direct child nodes of *node*, that is, all fields that are
nodes and all items of fields that are lists of nodes.
"""
for name in node._fields:
field = getattr(node, name, None)
if isinstance(field, astcls):
yield field
elif isinstance(field, list):
for item in field:
yield item
class Binding(object):
"""
Represents the binding of a value to a name.
The checker uses this to keep track of which names have been bound and
which names have not. See L{Assignment} for a special type of binding that
is checked with stricter rules.
@ivar used: pair of (L{Scope}, line-number) indicating the scope and
line number that this binding was last used
"""
def __init__(self, name, source):
self.name = name
self.source = source
self.used = False
def __str__(self):
return self.name
def __repr__(self):
return '<%s object %r from line %r at 0x%x>' % (
self.__class__.__name__,
self.name,
self.source.lineno,
id(self))
class UnBinding(Binding):
'''Created by the 'del' operator.'''
class Importation(Binding):
"""
A binding created by an import statement.
@ivar fullName: The complete name given to the import statement,
possibly including multiple dotted components.
@type fullName: C{str}
"""
def __init__(self, name, source):
self.fullName = name
name = name.split('.')[0]
super(Importation, self).__init__(name, source)
class Argument(Binding):
"""
Represents binding a name as an argument.
"""
class Assignment(Binding):
"""
Represents binding a name with an explicit assignment.
The checker will raise warnings for any Assignment that isn't used. Also,
the checker does not consider assignments in tuple/list unpacking to be
Assignments, rather it treats them as simple Bindings.
"""
class FunctionDefinition(Binding):
pass
class ExportBinding(Binding):
"""
A binding created by an C{__all__} assignment. If the names in the list
can be determined statically, they will be treated as names for export and
additional checking applied to them.
The only C{__all__} assignment that can be recognized is one which takes
the value of a literal list containing literal strings. For example::
__all__ = ["foo", "bar"]
Names which are imported and not otherwise used but appear in the value of
C{__all__} will not have an unused import warning reported for them.
"""
def names(self):
"""
Return a list of the names referenced by this binding.
"""
names = []
if isinstance(self.source, _ast.List):
for node in self.source.elts:
if isinstance(node, _ast.Str):
names.append(node.s)
return names
class Scope(dict):
importStarred = False # set to True when import * is found
def __repr__(self):
return '<%s at 0x%x %s>' % (self.__class__.__name__,
id(self),
dict.__repr__(self))
def __init__(self):
super(Scope, self).__init__()
class ClassScope(Scope):
pass
class FunctionScope(Scope):
"""
I represent a name scope for a function.
@ivar globals: Names declared 'global' in this function.
"""
def __init__(self):
super(FunctionScope, self).__init__()
self.globals = {}
class ModuleScope(Scope):
pass
# Globally defined names which are not attributes of the __builtin__ module.
_MAGIC_GLOBALS = ['__file__', '__builtins__']
class Checker(object):
"""
I check the cleanliness and sanity of Python code.
@ivar _deferredFunctions: Tracking list used by L{deferFunction}. Elements
of the list are two-tuples. The first element is the callable passed
to L{deferFunction}. The second element is a copy of the scope stack
at the time L{deferFunction} was called.
@ivar _deferredAssignments: Similar to C{_deferredFunctions}, but for
callables which are deferred assignment checks.
"""
nodeDepth = 0
traceTree = False
def __init__(self, tree, filename='(none)'):
self._deferredFunctions = []
self._deferredAssignments = []
self.dead_scopes = []
self.messages = []
self.filename = filename
self.scopeStack = [ModuleScope()]
self.futuresAllowed = True
self.handleChildren(tree)
self._runDeferred(self._deferredFunctions)
# Set _deferredFunctions to None so that deferFunction will fail
# noisily if called after we've run through the deferred functions.
self._deferredFunctions = None
self._runDeferred(self._deferredAssignments)
# Set _deferredAssignments to None so that deferAssignment will fail
# noisly if called after we've run through the deferred assignments.
self._deferredAssignments = None
del self.scopeStack[1:]
self.popScope()
self.check_dead_scopes()
def deferFunction(self, callable):
'''
Schedule a function handler to be called just before completion.
This is used for handling function bodies, which must be deferred
because code later in the file might modify the global scope. When
`callable` is called, the scope at the time this is called will be
restored, however it will contain any new bindings added to it.
'''
self._deferredFunctions.append((callable, self.scopeStack[:]))
def deferAssignment(self, callable):
"""
Schedule an assignment handler to be called just after deferred
function handlers.
"""
self._deferredAssignments.append((callable, self.scopeStack[:]))
def _runDeferred(self, deferred):
"""
Run the callables in C{deferred} using their associated scope stack.
"""
for handler, scope in deferred:
self.scopeStack = scope
handler()
def scope(self):
return self.scopeStack[-1]
scope = property(scope)
def popScope(self):
self.dead_scopes.append(self.scopeStack.pop())
def check_dead_scopes(self):
"""
Look at scopes which have been fully examined and report names in them
which were imported but unused.
"""
for scope in self.dead_scopes:
export = isinstance(scope.get('__all__'), ExportBinding)
if export:
all = scope['__all__'].names()
if os.path.split(self.filename)[1] != '__init__.py':
# Look for possible mistakes in the export list
undefined = set(all) - set(scope)
for name in undefined:
self.report(
messages.UndefinedExport,
scope['__all__'].source.lineno,
name)
else:
all = []
# Look for imported names that aren't used.
for importation in scope.values():
if isinstance(importation, Importation):
if not importation.used and importation.name not in all:
self.report(
messages.UnusedImport,
importation.source.lineno,
importation.name)
def pushFunctionScope(self):
self.scopeStack.append(FunctionScope())
def pushClassScope(self):
self.scopeStack.append(ClassScope())
def report(self, messageClass, *args, **kwargs):
self.messages.append(messageClass(self.filename, *args, **kwargs))
def handleChildren(self, tree):
for node in iter_child_nodes(tree):
self.handleNode(node, tree)
def isDocstring(self, node):
"""
Determine if the given node is a docstring, as long as it is at the
correct place in the node tree.
"""
return isinstance(node, _ast.Str) or \
(isinstance(node, _ast.Expr) and
isinstance(node.value, _ast.Str))
def handleNode(self, node, parent):
node.parent = parent
if self.traceTree:
print(' ' * self.nodeDepth + node.__class__.__name__)
self.nodeDepth += 1
if self.futuresAllowed and not \
(isinstance(node, _ast.ImportFrom) or self.isDocstring(node)):
self.futuresAllowed = False
nodeType = node.__class__.__name__.upper()
try:
handler = getattr(self, nodeType)
handler(node)
finally:
self.nodeDepth -= 1
if self.traceTree:
print(' ' * self.nodeDepth + 'end ' + node.__class__.__name__)
def ignore(self, node):
pass
# "stmt" type nodes
RETURN = DELETE = PRINT = WHILE = IF = WITH = RAISE = TRYEXCEPT = \
TRYFINALLY = ASSERT = EXEC = EXPR = handleChildren
CONTINUE = BREAK = PASS = ignore
# "expr" type nodes
BOOLOP = BINOP = UNARYOP = IFEXP = DICT = SET = YIELD = COMPARE = \
CALL = REPR = ATTRIBUTE = SUBSCRIPT = LIST = TUPLE = TRY = \
WITHITEM = handleChildren
NUM = STR = ELLIPSIS = ignore
# "slice" type nodes
SLICE = EXTSLICE = INDEX = handleChildren
# expression contexts are node instances too, though being constants
LOAD = STORE = DEL = AUGLOAD = AUGSTORE = PARAM = ignore
# same for operators
AND = OR = ADD = SUB = MULT = DIV = MOD = POW = LSHIFT = RSHIFT = \
BITOR = BITXOR = BITAND = FLOORDIV = INVERT = NOT = UADD = USUB = \
EQ = NOTEQ = LT = LTE = GT = GTE = IS = ISNOT = IN = NOTIN = ignore
# additional node types
COMPREHENSION = KEYWORD = handleChildren
def EXCEPTHANDLER(self, node):
if node.name is not None:
if isinstance(node.name, str):
name = node.name
elif hasattr(node.name, 'elts'):
names = [e.id for e in node.name.elts]
name = '({0})'.format(', '.join(names))
else:
name = node.name.id
self.addBinding(node.lineno, Assignment(name, node))
def runException():
for stmt in iter_child_nodes(node):
self.handleNode(stmt, node)
self.deferFunction(runException)
def addBinding(self, lineno, value, reportRedef=True):
'''Called when a binding is altered.
- `lineno` is the line of the statement responsible for the change
- `value` is the optional new value, a Binding instance, associated
with the binding; if None, the binding is deleted if it exists.
- if `reportRedef` is True (default), rebinding while unused will be
reported.
'''
if (isinstance(self.scope.get(value.name), FunctionDefinition)
and isinstance(value, FunctionDefinition)):
self.report(messages.RedefinedFunction,
lineno, value.name,
self.scope[value.name].source.lineno)
if not isinstance(self.scope, ClassScope):
for scope in self.scopeStack[::-1]:
existing = scope.get(value.name)
if (isinstance(existing, Importation)
and not existing.used
and (not isinstance(value, Importation)
or value.fullName == existing.fullName)
and reportRedef):
self.report(messages.RedefinedWhileUnused,
lineno, value.name,
scope[value.name].source.lineno)
if isinstance(value, UnBinding):
try:
del self.scope[value.name]
except KeyError:
self.report(messages.UndefinedName, lineno, value.name)
else:
self.scope[value.name] = value
def GLOBAL(self, node):
"""
Keep track of globals declarations.
"""
if isinstance(self.scope, FunctionScope):
self.scope.globals.update(dict.fromkeys(node.names))
def LISTCOMP(self, node):
# handle generators before element
for gen in node.generators:
self.handleNode(gen, node)
self.handleNode(node.elt, node)
GENERATOREXP = SETCOMP = LISTCOMP
# dictionary comprehensions; introduced in Python 2.7
def DICTCOMP(self, node):
for gen in node.generators:
self.handleNode(gen, node)
self.handleNode(node.key, node)
self.handleNode(node.value, node)
def FOR(self, node):
"""
Process bindings for loop variables.
"""
vars = []
def collectLoopVars(n):
if isinstance(n, _ast.Name):
vars.append(n.id)
elif isinstance(n, _ast.expr_context):
return
else:
for c in iter_child_nodes(n):
collectLoopVars(c)
collectLoopVars(node.target)
for varn in vars:
if (isinstance(self.scope.get(varn), Importation)
# unused ones will get an unused import warning
and self.scope[varn].used):
self.report(messages.ImportShadowedByLoopVar,
node.lineno, varn, self.scope[varn].source.lineno)
self.handleChildren(node)
def NAME(self, node):
"""
Handle occurrence of Name (which can be a load/store/delete access.)
"""
# Locate the name in locals / function / globals scopes.
if isinstance(node.ctx, (_ast.Load, _ast.AugLoad)):
# try local scope
importStarred = self.scope.importStarred
try:
self.scope[node.id].used = (self.scope, node.lineno)
except KeyError:
pass
else:
return
# try enclosing function scopes
for scope in self.scopeStack[-2:0:-1]:
importStarred = importStarred or scope.importStarred
if not isinstance(scope, FunctionScope):
continue
try:
scope[node.id].used = (self.scope, node.lineno)
except KeyError:
pass
else:
return
# try global scope
importStarred = importStarred or self.scopeStack[0].importStarred
try:
self.scopeStack[0][node.id].used = (self.scope, node.lineno)
except KeyError:
if ((not hasattr(__builtin__, node.id))
and node.id not in _MAGIC_GLOBALS
and not importStarred):
if (os.path.basename(self.filename) == '__init__.py' and
node.id == '__path__'):
# the special name __path__ is valid only in packages
pass
else:
self.report(messages.UndefinedName,
node.lineno,
node.id)
elif isinstance(node.ctx, (_ast.Store, _ast.AugStore)):
# if the name hasn't already been defined in the current scope
if isinstance(self.scope, FunctionScope) and \
node.id not in self.scope:
# for each function or module scope above us
for scope in self.scopeStack[:-1]:
if not isinstance(scope, (FunctionScope, ModuleScope)):
continue
# if the name was defined in that scope, and the name has
# been accessed already in the current scope, and hasn't
# been declared global
if (node.id in scope
and scope[node.id].used
and scope[node.id].used[0] is self.scope
and node.id not in self.scope.globals):
# then it's probably a mistake
self.report(messages.UndefinedLocal,
scope[node.id].used[1],
node.id,
scope[node.id].source.lineno)
break
if isinstance(node.parent,
(_ast.For,
_ast.comprehension,
_ast.Tuple,
_ast.List)):
binding = Binding(node.id, node)
elif (node.id == '__all__' and
isinstance(self.scope, ModuleScope)):
binding = ExportBinding(node.id, node.parent.value)
else:
binding = Assignment(node.id, node)
if node.id in self.scope:
binding.used = self.scope[node.id].used
self.addBinding(node.lineno, binding)
elif isinstance(node.ctx, _ast.Del):
if isinstance(self.scope, FunctionScope) and \
node.id in self.scope.globals:
del self.scope.globals[node.id]
else:
self.addBinding(node.lineno, UnBinding(node.id, node))
else:
# must be a Param context -- this only happens for names
# in function arguments, but these aren't dispatched through here
raise RuntimeError(
"Got impossible expression context: %r" % (node.ctx,))
def FUNCTIONDEF(self, node):
# the decorators attribute is called decorator_list as of Python 2.6
if hasattr(node, 'decorators'):
for deco in node.decorators:
self.handleNode(deco, node)
else:
for deco in node.decorator_list:
self.handleNode(deco, node)
self.addBinding(node.lineno, FunctionDefinition(node.name, node))
self.LAMBDA(node)
def LAMBDA(self, node):
for default in node.args.defaults:
self.handleNode(default, node)
def runFunction():
args = []
def addArgs(arglist):
for arg in arglist:
if isinstance(arg, _ast.Tuple):
addArgs(arg.elts)
else:
try:
id_ = arg.id
except AttributeError:
id_ = arg.arg
if id_ in args:
self.report(messages.DuplicateArgument,
node.lineno, id_)
args.append(id_)
self.pushFunctionScope()
addArgs(node.args.args)
# vararg/kwarg identifiers are not Name nodes
if node.args.vararg:
args.append(node.args.vararg)
if node.args.kwarg:
args.append(node.args.kwarg)
for name in args:
self.addBinding(node.lineno, Argument(name, node),
reportRedef=False)
if isinstance(node.body, list):
# case for FunctionDefs
for stmt in node.body:
self.handleNode(stmt, node)
else:
# case for Lambdas
self.handleNode(node.body, node)
def checkUnusedAssignments():
"""
Check to see if any assignments have not been used.
"""
for name, binding in self.scope.items():
if (not binding.used and not name in self.scope.globals
and isinstance(binding, Assignment)):
self.report(messages.UnusedVariable,
binding.source.lineno, name)
self.deferAssignment(checkUnusedAssignments)
self.popScope()
self.deferFunction(runFunction)
def CLASSDEF(self, node):
"""
Check names used in a class definition, including its decorators, base
classes, and the body of its definition. Additionally, add its name to
the current scope.
"""
# decorator_list is present as of Python 2.6
for deco in getattr(node, 'decorator_list', []):
self.handleNode(deco, node)
for baseNode in node.bases:
self.handleNode(baseNode, node)
self.pushClassScope()
for stmt in node.body:
self.handleNode(stmt, node)
self.popScope()
self.addBinding(node.lineno, Binding(node.name, node))
def ASSIGN(self, node):
self.handleNode(node.value, node)
for target in node.targets:
self.handleNode(target, node)
def AUGASSIGN(self, node):
# AugAssign is awkward: must set the context explicitly
# and visit twice, once with AugLoad context, once with
# AugStore context
node.target.ctx = _ast.AugLoad()
self.handleNode(node.target, node)
self.handleNode(node.value, node)
node.target.ctx = _ast.AugStore()
self.handleNode(node.target, node)
def IMPORT(self, node):
for alias in node.names:
name = alias.asname or alias.name
importation = Importation(name, node)
self.addBinding(node.lineno, importation)
def IMPORTFROM(self, node):
if node.module == '__future__':
if not self.futuresAllowed:
self.report(messages.LateFutureImport, node.lineno,
[n.name for n in node.names])
else:
self.futuresAllowed = False
for alias in node.names:
if alias.name == '*':
self.scope.importStarred = True
self.report(messages.ImportStarUsed, node.lineno, node.module)
continue
name = alias.asname or alias.name
importation = Importation(name, node)
if node.module == '__future__':
importation.used = (self.scope, node.lineno)
self.addBinding(node.lineno, importation)
def checkPath(filename, ignore=[]):
"""
Check the given path, printing out any warnings detected.
@return: the number of warnings printed
"""
try:
return check(open(filename, 'U').read() + '\n', ignore, filename)
except IOError:
msg = sys.exc_info()[1]
sys.stderr.write("%s: %s\n" % (filename, msg.args[1]))
return 1
def check(codeString, ignore, filename='stdin'):
"""
Check the Python source given by C{codeString} for flakes.
@param codeString: The Python source to check.
@type codeString: C{str}
@param filename: The name of the file the source came from, used to report
errors.
@type filename: C{str}
@return: The number of warnings emitted.
@rtype: C{int}
"""
# First, compile into an AST and handle syntax errors.
try:
tree = compile(codeString, filename, "exec", _ast.PyCF_ONLY_AST)
except SyntaxError:
value = sys.exc_info()[1]
msg = value.args[0]
(lineno, offset, text) = value.lineno, value.offset, value.text
# If there's an encoding problem with the file, the text is None.
if text is None:
# Avoid using msg, since for the only known case, it contains a
# bogus message that claims the encoding the file declared was
# unknown.
sys.stderr.write("%s: problem decoding source\n" % (filename))
else:
line = text.splitlines()[-1]
if offset is not None:
offset = offset - (len(text) - len(line))
sys.stderr.write('%s:%d: %s\n' % (filename, lineno, msg))
sys.stderr.write(line + '\n')
if offset is not None:
sys.stderr.write(" " * offset + "^\n")
return 1
else:
# Okay, it's syntactically valid. Now check it.
w = Checker(tree, filename)
sorting = [(msg.lineno, msg) for msg in w.messages]
sorting.sort()
w.messages = [msg for index, msg in sorting]
valid_warnings = 0
for warning in w.messages:
if skip_warning(warning, ignore):
continue
print(warning)
valid_warnings += 1
return valid_warnings

View file

@ -2,237 +2,10 @@
"""
Implementation of the command-line I{flake8} tool.
"""
import sys
import os
import os.path
from subprocess import PIPE, Popen
import select
try:
from StringIO import StringIO
except ImportError:
from io import StringIO # NOQA
from flake8.util import skip_file
from flake8 import pep8
from flake8 import pyflakes
from flake8 import mccabe
pep8style = None
def check_file(path, ignore=(), complexity=-1):
if pep8style.excluded(path):
return 0
warnings = pyflakes.checkPath(path, ignore)
warnings += pep8style.input_file(path)
if complexity > -1:
warnings += mccabe.get_module_complexity(path, complexity)
return warnings
def check_code(code, ignore=(), complexity=-1):
warnings = pyflakes.check(code, ignore, 'stdin')
warnings += pep8style.input_file(None, lines=code.split('\n'))
if complexity > -1:
warnings += mccabe.get_code_complexity(code, complexity)
return warnings
def _get_python_files(paths):
for path in paths:
if os.path.isdir(path):
for dirpath, dirnames, filenames in os.walk(path):
if pep8style.excluded(dirpath):
continue
for filename in filenames:
if not filename.endswith('.py'):
continue
fullpath = os.path.join(dirpath, filename)
if not skip_file(fullpath) or pep8style.excluded(fullpath):
yield fullpath
else:
if not skip_file(path) or pep8style.excluded(path):
yield path
def read_stdin():
# wait for 1 second on the stdin fd
reads, __, __ = select.select([sys.stdin], [], [], 1.)
if reads == []:
print('input not specified')
raise SystemExit(1)
return sys.stdin.read()
def main():
global pep8style
pep8style = pep8.StyleGuide(parse_argv=True, config_file=True)
options = pep8style.options
complexity = options.max_complexity
builtins = set(options.builtins)
warnings = 0
stdin = None
if builtins:
orig_builtins = set(pyflakes._MAGIC_GLOBALS)
pyflakes._MAGIC_GLOBALS = orig_builtins | builtins
if pep8style.paths and options.filename is not None:
for path in _get_python_files(pep8style.paths):
if path == '-':
if stdin is None:
stdin = read_stdin()
warnings += check_code(stdin, options.ignore, complexity)
else:
warnings += check_file(path, options.ignore, complexity)
else:
stdin = read_stdin()
warnings += check_code(stdin, options.ignore, complexity)
if options.exit_zero:
raise SystemExit(0)
raise SystemExit(warnings > 0)
def _get_files(repo, **kwargs):
seen = set()
for rev in range(repo[kwargs['node']], len(repo)):
for file_ in repo[rev].files():
file_ = os.path.join(repo.root, file_)
if file_ in seen or not os.path.exists(file_):
continue
seen.add(file_)
if not file_.endswith('.py'):
continue
if skip_file(file_):
continue
yield file_
class _PEP8Options(object):
# Default options taken from pep8.process_options()
max_complexity = -1
verbose = False
quiet = False
no_repeat = False
exclude = [exc.rstrip('/') for exc in pep8.DEFAULT_EXCLUDE.split(',')]
filename = ['*.py']
select = []
ignore = pep8.DEFAULT_IGNORE.split(',') # or []?
show_source = False
show_pep8 = False
statistics = False
count = False
benchmark = False
testsuite = ''
doctest = False
max_line_length = pep8.MAX_LINE_LENGTH
def _initpep8():
# default pep8 setup
global pep8style
if pep8style is None:
pep8style = pep8.StyleGuide(config_file=True)
#pep8style.options.physical_checks = pep8.find_checks('physical_line')
#pep8style.options.logical_checks = pep8.find_checks('logical_line')
pep8style.options.counters = dict.fromkeys(pep8.BENCHMARK_KEYS, 0)
pep8style.options.messages = {}
pep8style.options.max_line_length = 79
pep8style.args = []
def run(command):
p = Popen(command.split(), stdout=PIPE, stderr=PIPE)
p.wait()
return (p.returncode, [line.strip() for line in p.stdout.readlines()],
[line.strip() for line in p.stderr.readlines()])
def git_hook(complexity=-1, strict=False, ignore=None, lazy=False):
_initpep8()
if ignore:
pep8style.options.ignore = ignore
warnings = 0
gitcmd = "git diff-index --cached --name-only HEAD"
if lazy:
gitcmd = gitcmd.replace('--cached ', '')
_, files_modified, _ = run(gitcmd)
for filename in files_modified:
ext = os.path.splitext(filename)[-1]
if ext != '.py':
continue
if not os.path.exists(filename):
continue
warnings += check_file(path=filename, ignore=ignore,
complexity=complexity)
if strict:
return warnings
return 0
def hg_hook(ui, repo, **kwargs):
_initpep8()
complexity = ui.config('flake8', 'complexity', default=-1)
warnings = 0
for file_ in _get_files(repo, **kwargs):
warnings += check_file(file_, complexity)
strict = ui.configbool('flake8', 'strict', default=True)
if strict:
return warnings
return 0
try:
from setuptools import Command
except ImportError:
Flake8Command = None
else:
class Flake8Command(Command):
description = "Run flake8 on modules registered in setuptools"
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def distribution_files(self):
if self.distribution.packages:
for package in self.distribution.packages:
yield package.replace(".", os.path.sep)
if self.distribution.py_modules:
for filename in self.distribution.py_modules:
yield "%s.py" % filename
def run(self):
_initpep8()
# _get_python_files can produce the same file several
# times, if one of its paths is a parent of another. Keep
# a set of checked files to de-duplicate.
checked = set()
warnings = 0
for path in _get_python_files(self.distribution_files()):
if path not in checked:
warnings += check_file(path)
checked.add(path)
raise SystemExit(warnings > 0)
from flake8.hooks import git_hook, hg_hook # noqa
from flake8.main import main
from flake8.main import (check_code, check_file, get_parser, # noqa
Flake8Command) # noqa
if __name__ == '__main__':

View file

@ -1,5 +1,5 @@
from unittest import TestCase
from flake8.pyflakes import check
from flakey import check, print_messages
code = """
@ -49,11 +49,16 @@ class TestFlake(TestCase):
def test_exception(self):
for c in (code, code2, code3):
warnings = check(code)
self.assertEqual(warnings, 0, code)
warnings = check(code, '(stdin)')
warnings = print_messages(warnings)
self.assertEqual(warnings, 0)
def test_from_import_exception_in_scope(self):
self.assertEqual(check(code_from_import_exception), 0)
warnings = check(code_from_import_exception, '(stdin)')
warnings = print_messages(warnings)
self.assertEqual(warnings, 0)
def test_import_exception_in_scope(self):
self.assertEqual(check(code_import_exception), 0)
warnings = check(code_import_exception, '(stdin)')
warnings = print_messages(warnings)
self.assertEqual(warnings, 0)

View file

@ -1,12 +1,57 @@
from __future__ import with_statement
import re
import os
import sys
from io import StringIO
import optparse
try:
# Python 2
from ConfigParser import ConfigParser
except ImportError:
# Python 3
from configparser import ConfigParser
pep8style = None
def get_parser():
"""Create a custom OptionParser"""
from flake8 import __version__
import flakey
import pep8
parser = pep8.get_parser()
def version(option, opt, value, parser):
parser.print_usage()
parser.print_version()
sys.exit(0)
parser.version = '{0} (pep8: {1}, flakey: {2})'.format(
__version__, pep8.__version__, flakey.__version__)
parser.remove_option('--version')
parser.add_option('--builtins', default='', dest='builtins',
help="append builtin functions to flakey's "
"_MAGIC_BUILTINS")
parser.add_option('--exit-zero', action='store_true', default=False,
help='Exit with status 0 even if there are errors')
parser.add_option('--max-complexity', default=-1, action='store',
type='int', help='McCabe complexity threshold')
parser.add_option('--install-hook', default=False, action='store_true',
help='Install the appropriate hook for this '
'repository.', dest='install_hook')
# don't overlap with pep8's verbose option
parser.add_option('-V', '--version', action='callback',
callback=version,
help='Print the version info for flake8')
return parser
def skip_warning(warning, ignore=[]):
# XXX quick dirty hack, just need to keep the line in the warning
if not hasattr(warning, 'message'):
if not hasattr(warning, 'message') or ignore is None:
# McCabe's warnings cannot be skipped afaik, and they're all strings.
# And we'll get a TypeError otherwise
return False
if warning.message.split()[0] in ignore:
return True
@ -27,16 +72,36 @@ def skip_line(line):
_NOQA = re.compile(r'flake8[:=]\s*noqa', re.I | re.M)
def skip_file(path):
def skip_file(path, source=None):
"""Returns True if this header is found in path
# flake8: noqa
"""
if not os.path.isfile(path):
if os.path.isfile(path):
f = open(path)
elif source:
f = StringIO(source)
else:
return False
f = open(path)
try:
content = f.read()
finally:
f.close()
return _NOQA.search(content) is not None
def _initpep8(config_file=True):
# default pep8 setup
global pep8style
import pep8
if pep8style is None:
pep8style = pep8.StyleGuide(config_file=config_file)
pep8style.options.physical_checks = pep8.find_checks('physical_line')
pep8style.options.logical_checks = pep8.find_checks('logical_line')
pep8style.options.counters = dict.fromkeys(pep8.BENCHMARK_KEYS, 0)
pep8style.options.messages = {}
if not pep8style.options.max_line_length:
pep8style.options.max_line_length = 79
pep8style.args = []
return pep8style

View file

@ -2,7 +2,7 @@ import sys
import os
ispy3 = sys.version_info[0] == 3
iswin = os.name == 'nt'
iswin = os.name == 'nt'
kwargs = {}
scripts = ["flake8/flake8"]
@ -14,9 +14,10 @@ else:
try:
from setuptools import setup # NOQA
kwargs = {
'entry_points':
{'distutils.commands': ['flake8 = flake8.run:Flake8Command'],
'console_scripts': ['flake8 = flake8.run:main']},
'entry_points': {
'distutils.commands': ['flake8 = flake8.main:Flake8Command'],
'console_scripts': ['flake8 = flake8.main:main']
},
'tests_require': ['nose'],
'test_suite': 'nose.collector',
}
@ -27,7 +28,7 @@ else:
from flake8 import __version__
README = open('README').read()
README = open('README.rst').read()
setup(
name="flake8",
@ -36,9 +37,12 @@ setup(
description="code checking using pep8 and pyflakes",
author="Tarek Ziade",
author_email="tarek@ziade.org",
maintainer="Ian Cordasco",
maintainer_email="graffatcolmingov@gmail.com",
url="http://bitbucket.org/tarek/flake8",
packages=["flake8", "flake8.tests"],
scripts=scripts,
install_requires=["flakey (==2.0)", "pep8 (==1.4.1)"],
long_description=README,
classifiers=[
"Environment :: Console",
@ -47,5 +51,6 @@ setup(
"Programming Language :: Python",
"Topic :: Software Development",
"Topic :: Utilities",
],
**kwargs)
],
**kwargs
)