split out file discovery and test it

This commit is contained in:
Anthony Sottile 2021-11-14 19:53:35 -08:00
parent c0ddae2948
commit 66071563c2
9 changed files with 291 additions and 217 deletions

View file

@ -60,24 +60,6 @@ Another helpful function that is named only to be explicit given it is a very
trivial check, this checks if the user specified ``-`` in their arguments to
|Flake8| to indicate we should read from stdin.
.. autofunction:: flake8.utils.filenames_from
When provided an argument to |Flake8|, we need to be able to traverse
directories in a convenient manner. For example, if someone runs
.. code::
$ flake8 flake8/
Then they want us to check all of the files in the directory ``flake8/``. This
function will handle that while also handling the case where they specify a
file like:
.. code::
$ flake8 flake8/__init__.py
.. autofunction:: flake8.utils.fnmatch
The standard library's :func:`fnmatch.fnmatch` is excellent at deciding if a

View file

@ -111,6 +111,8 @@ warn_unused_ignores = true
disallow_untyped_defs = true
[mypy-flake8.defaults]
disallow_untyped_defs = true
[mypy-flake8.discover_files]
disallow_untyped_defs = true
[mypy-flake8.exceptions]
disallow_untyped_defs = true
[mypy-flake8.formatting.*]

View file

@ -15,6 +15,7 @@ from flake8 import defaults
from flake8 import exceptions
from flake8 import processor
from flake8 import utils
from flake8.discover_files import expand_paths
Results = List[Tuple[str, int, int, str, Optional[str]]]
@ -155,70 +156,21 @@ class Manager:
)
return reported_results_count
def is_path_excluded(self, path: str) -> bool:
"""Check if a path is excluded.
:param str path:
Path to check against the exclude patterns.
:returns:
True if there are exclude patterns and the path matches,
otherwise False.
:rtype:
bool
"""
if path == "-":
if self.options.stdin_display_name == "stdin":
return False
path = self.options.stdin_display_name
return utils.matches_filename(
path,
patterns=self.exclude,
log_message='"%(path)s" has %(whether)sbeen excluded',
logger=LOG,
)
def make_checkers(self, paths: Optional[List[str]] = None) -> None:
"""Create checkers for each file."""
if paths is None:
paths = self.arguments
if not paths:
paths = ["."]
filename_patterns = self.options.filename
running_from_diff = self.options.diff
# NOTE(sigmavirus24): Yes this is a little unsightly, but it's our
# best solution right now.
def should_create_file_checker(filename, argument):
"""Determine if we should create a file checker."""
matches_filename_patterns = utils.fnmatch(
filename, filename_patterns
)
is_stdin = filename == "-"
# NOTE(sigmavirus24): If a user explicitly specifies something,
# e.g, ``flake8 bin/script`` then we should run Flake8 against
# that. Since should_create_file_checker looks to see if the
# filename patterns match the filename, we want to skip that in
# the event that the argument and the filename are identical.
# If it was specified explicitly, the user intended for it to be
# checked.
explicitly_provided = not running_from_diff and (
argument == filename
)
return (
explicitly_provided or matches_filename_patterns
) or is_stdin
checks = self.checks.to_dictionary()
self._all_checkers = [
FileChecker(filename, checks, self.options)
for argument in paths
for filename in utils.filenames_from(
argument, self.is_path_excluded
for filename in expand_paths(
paths=paths,
stdin_display_name=self.options.stdin_display_name,
filename_patterns=self.options.filename,
exclude=self.exclude,
is_running_from_diff=self.options.diff,
)
if should_create_file_checker(filename, argument)
]
self.checkers = [c for c in self._all_checkers if c.should_process]
LOG.info("Checking %d files", len(self.checkers))

View file

@ -0,0 +1,96 @@
"""Functions related to discovering paths."""
import logging
import os.path
from typing import Callable
from typing import Generator
from typing import Sequence
from flake8 import utils
LOG = logging.getLogger(__name__)
def _filenames_from(
arg: str,
*,
predicate: Callable[[str], bool],
) -> Generator[str, None, None]:
"""Generate filenames from an argument.
:param str arg:
Parameter from the command-line.
:param callable predicate:
Predicate to use to filter out filenames. If the predicate
returns ``True`` we will exclude the filename, otherwise we
will yield it. By default, we include every filename
generated.
:returns:
Generator of paths
"""
if predicate(arg):
return
if os.path.isdir(arg):
for root, sub_directories, files in os.walk(arg):
# NOTE(sigmavirus24): os.walk() will skip a directory if you
# remove it from the list of sub-directories.
for directory in tuple(sub_directories):
joined = os.path.join(root, directory)
if predicate(joined):
sub_directories.remove(directory)
for filename in files:
joined = os.path.join(root, filename)
if not predicate(joined):
yield joined
else:
yield arg
def expand_paths(
*,
paths: Sequence[str],
stdin_display_name: str,
filename_patterns: Sequence[str],
exclude: Sequence[str],
is_running_from_diff: bool,
) -> Generator[str, None, None]:
"""Expand out ``paths`` from commandline to the lintable files."""
if not paths:
paths = ["."]
def is_excluded(arg: str) -> bool:
if arg == "-":
# if the stdin_display_name is the default, always include it
if stdin_display_name == "stdin":
return False
arg = stdin_display_name
return utils.matches_filename(
arg,
patterns=exclude,
log_message='"%(path)s" has %(whether)sbeen excluded',
logger=LOG,
)
def is_included(arg: str, fname: str) -> bool:
# while running from a diff, the arguments aren't _explicitly_
# listed so we still filter them
if is_running_from_diff:
return utils.fnmatch(fname, filename_patterns)
else:
return (
# always lint `-`
fname == "-"
# always lint explicitly passed (even if not matching filter)
or arg == fname
# otherwise, check the file against filtered patterns
or utils.fnmatch(fname, filename_patterns)
)
return (
filename
for path in paths
for filename in _filenames_from(path, predicate=is_excluded)
if is_included(path, filename)
)

View file

@ -11,9 +11,7 @@ import re
import sys
import textwrap
import tokenize
from typing import Callable
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Pattern
@ -294,52 +292,6 @@ def is_using_stdin(paths: List[str]) -> bool:
return "-" in paths
def _default_predicate(*args: str) -> bool:
return False
def filenames_from(
arg: str, predicate: Optional[Callable[[str], bool]] = None
) -> Generator[str, None, None]:
"""Generate filenames from an argument.
:param str arg:
Parameter from the command-line.
:param callable predicate:
Predicate to use to filter out filenames. If the predicate
returns ``True`` we will exclude the filename, otherwise we
will yield it. By default, we include every filename
generated.
:returns:
Generator of paths
"""
if predicate is None:
predicate = _default_predicate
if predicate(arg):
return
if os.path.isdir(arg):
for root, sub_directories, files in os.walk(arg):
if predicate(root):
sub_directories[:] = []
continue
# NOTE(sigmavirus24): os.walk() will skip a directory if you
# remove it from the list of sub-directories.
for directory in sub_directories:
joined = os.path.join(root, directory)
if predicate(joined):
sub_directories.remove(directory)
for filename in files:
joined = os.path.join(root, filename)
if not predicate(joined):
yield joined
else:
yield arg
def fnmatch(filename: str, patterns: Sequence[str]) -> bool:
"""Wrap :func:`fnmatch.fnmatch` to add some functionality.
@ -351,7 +303,7 @@ def fnmatch(filename: str, patterns: Sequence[str]) -> bool:
The default value if patterns is empty
:returns:
True if a pattern matches the filename, False if it doesn't.
``default`` if patterns is empty.
``True`` if patterns is empty.
"""
if not patterns:
return True

View file

@ -85,11 +85,9 @@ def test_make_checkers(_):
}
manager = checker.Manager(style_guide, files, checkplugins)
with mock.patch("flake8.utils.filenames_from") as filenames_from:
filenames_from.side_effect = [["file1"], ["file2"]]
with mock.patch("flake8.utils.fnmatch", return_value=True):
with mock.patch("flake8.processor.FileProcessor"):
manager.make_checkers()
with mock.patch("flake8.utils.fnmatch", return_value=True):
with mock.patch("flake8.processor.FileProcessor"):
manager.make_checkers(["file1", "file2"])
assert manager._all_checkers
for file_checker in manager._all_checkers:

View file

@ -0,0 +1,174 @@
import os.path
import pytest
from flake8 import utils
from flake8.discover_files import _filenames_from
from flake8.discover_files import expand_paths
@pytest.fixture
def files_dir(tmpdir):
"""Create test dir for testing filenames_from."""
with tmpdir.as_cwd():
tmpdir.join("a/b/c.py").ensure()
tmpdir.join("a/b/d.py").ensure()
tmpdir.join("a/b/e/f.py").ensure()
yield tmpdir
def _noop(path):
return False
def _normpath(s):
return s.replace("/", os.sep)
def _normpaths(pths):
return {_normpath(pth) for pth in pths}
@pytest.mark.usefixtures("files_dir")
def test_filenames_from_a_directory():
"""Verify that filenames_from walks a directory."""
filenames = set(_filenames_from(_normpath("a/b/"), predicate=_noop))
# should include all files
expected = _normpaths(("a/b/c.py", "a/b/d.py", "a/b/e/f.py"))
assert filenames == expected
@pytest.mark.usefixtures("files_dir")
def test_filenames_from_a_directory_with_a_predicate():
"""Verify that predicates filter filenames_from."""
filenames = set(
_filenames_from(
arg=_normpath("a/b/"),
predicate=lambda path: path.endswith(_normpath("b/c.py")),
)
)
# should not include c.py
expected = _normpaths(("a/b/d.py", "a/b/e/f.py"))
assert filenames == expected
@pytest.mark.usefixtures("files_dir")
def test_filenames_from_a_directory_with_a_predicate_from_the_current_dir():
"""Verify that predicates filter filenames_from."""
filenames = set(
_filenames_from(
arg=_normpath("./a/b"),
predicate=lambda path: path == "c.py",
)
)
# none should have matched the predicate so all returned
expected = _normpaths(("./a/b/c.py", "./a/b/d.py", "./a/b/e/f.py"))
assert filenames == expected
@pytest.mark.usefixtures("files_dir")
def test_filenames_from_a_single_file():
"""Verify that we simply yield that filename."""
filenames = set(_filenames_from(_normpath("a/b/c.py"), predicate=_noop))
assert filenames == {_normpath("a/b/c.py")}
def test_filenames_from_a_single_file_does_not_exist():
"""Verify that a passed filename which does not exist is returned back."""
filenames = set(_filenames_from(_normpath("d/n/e.py"), predicate=_noop))
assert filenames == {_normpath("d/n/e.py")}
def test_filenames_from_exclude_doesnt_exclude_directory_names(tmpdir):
"""Verify that we don't greedily exclude subdirs."""
tmpdir.join("1/dont_return_me.py").ensure()
tmpdir.join("2/1/return_me.py").ensure()
exclude = [tmpdir.join("1").strpath]
def predicate(pth):
return utils.fnmatch(os.path.abspath(pth), exclude)
with tmpdir.as_cwd():
filenames = list(_filenames_from(".", predicate=predicate))
assert filenames == [os.path.join(".", "2", "1", "return_me.py")]
def test_filenames_from_predicate_applies_to_initial_arg(tmp_path):
"""Test that the predicate is also applied to the passed argument."""
fname = str(tmp_path.joinpath("f.py"))
ret = tuple(_filenames_from(fname, predicate=lambda _: True))
assert ret == ()
def test_filenames_from_predicate_applies_to_dirname(tmp_path):
"""Test that the predicate can filter whole directories."""
a_dir = tmp_path.joinpath("a")
a_dir.mkdir()
a_dir.joinpath("b.py").touch()
b_py = tmp_path.joinpath("b.py")
b_py.touch()
def predicate(p):
# filter out the /a directory
return p.endswith("a")
ret = tuple(_filenames_from(str(tmp_path), predicate=predicate))
assert ret == (str(b_py),)
def _expand_paths(
*,
paths=(".",),
stdin_display_name="stdin",
filename_patterns=("*.py",),
exclude=(),
is_running_from_diff=False,
):
return set(
expand_paths(
paths=paths,
stdin_display_name=stdin_display_name,
filename_patterns=filename_patterns,
exclude=exclude,
is_running_from_diff=is_running_from_diff,
)
)
@pytest.mark.usefixtures("files_dir")
def test_expand_paths_honors_exclude():
expected = _normpaths(("./a/b/c.py", "./a/b/e/f.py"))
assert _expand_paths(exclude=["d.py"]) == expected
@pytest.mark.usefixtures("files_dir")
def test_expand_paths_defaults_to_dot():
expected = _normpaths(("./a/b/c.py", "./a/b/d.py", "./a/b/e/f.py"))
assert _expand_paths(paths=()) == expected
def test_default_stdin_name_is_not_filtered():
assert _expand_paths(paths=("-",)) == {"-"}
def test_alternate_stdin_name_is_filtered():
ret = _expand_paths(
paths=("-",),
stdin_display_name="wat",
exclude=("wat",),
)
assert ret == set()
def test_filename_included_even_if_not_matching_include(tmp_path):
some_file = str(tmp_path.joinpath("some/file"))
assert _expand_paths(paths=(some_file,)) == {some_file}
def test_diff_filenames_filtered_by_patterns(tmp_path):
f1 = str(tmp_path.joinpath("f1"))
f2 = str(tmp_path.joinpath("f2.py"))
ret = _expand_paths(paths=(f1, f2), is_running_from_diff=True)
assert ret == {f2}

View file

@ -160,6 +160,13 @@ def test_normalize_paths(value, expected):
assert utils.normalize_paths(value) == expected
def test_matches_filename_for_excluding_dotfiles():
"""Verify that `.` and `..` are not matched by `.*`."""
logger = logging.Logger(__name__)
assert not utils.matches_filename(".", (".*",), "", logger)
assert not utils.matches_filename("..", (".*",), "", logger)
@pytest.mark.parametrize(
"filename,patterns,expected",
[
@ -174,89 +181,6 @@ def test_fnmatch(filename, patterns, expected):
assert utils.fnmatch(filename, patterns) is expected
@pytest.fixture
def files_dir(tmpdir):
"""Create test dir for testing filenames_from."""
with tmpdir.as_cwd():
tmpdir.join("a/b/c.py").ensure()
tmpdir.join("a/b/d.py").ensure()
tmpdir.join("a/b/e/f.py").ensure()
yield tmpdir
def _normpath(s):
return s.replace("/", os.sep)
def _normpaths(pths):
return {_normpath(pth) for pth in pths}
@pytest.mark.usefixtures("files_dir")
def test_filenames_from_a_directory():
"""Verify that filenames_from walks a directory."""
filenames = set(utils.filenames_from(_normpath("a/b/")))
# should include all files
expected = _normpaths(("a/b/c.py", "a/b/d.py", "a/b/e/f.py"))
assert filenames == expected
@pytest.mark.usefixtures("files_dir")
def test_filenames_from_a_directory_with_a_predicate():
"""Verify that predicates filter filenames_from."""
filenames = set(
utils.filenames_from(
arg=_normpath("a/b/"),
predicate=lambda path: path.endswith(_normpath("b/c.py")),
)
)
# should not include c.py
expected = _normpaths(("a/b/d.py", "a/b/e/f.py"))
assert filenames == expected
@pytest.mark.usefixtures("files_dir")
def test_filenames_from_a_directory_with_a_predicate_from_the_current_dir():
"""Verify that predicates filter filenames_from."""
filenames = set(
utils.filenames_from(
arg=_normpath("./a/b"),
predicate=lambda path: path == "c.py",
)
)
# none should have matched the predicate so all returned
expected = _normpaths(("./a/b/c.py", "./a/b/d.py", "./a/b/e/f.py"))
assert filenames == expected
@pytest.mark.usefixtures("files_dir")
def test_filenames_from_a_single_file():
"""Verify that we simply yield that filename."""
filenames = set(utils.filenames_from(_normpath("a/b/c.py")))
assert filenames == {_normpath("a/b/c.py")}
def test_filenames_from_a_single_file_does_not_exist():
"""Verify that a passed filename which does not exist is returned back."""
filenames = set(utils.filenames_from(_normpath("d/n/e.py")))
assert filenames == {_normpath("d/n/e.py")}
def test_filenames_from_exclude_doesnt_exclude_directory_names(tmpdir):
"""Verify that we don't greedily exclude subdirs."""
tmpdir.join("1").ensure_dir().join("dont_return_me.py").ensure()
tmpdir.join("2").join("1").ensure_dir().join("return_me.py").ensure()
exclude = [tmpdir.join("1").strpath]
# This acts similar to src.flake8.checker.is_path_excluded
def predicate(pth):
return utils.fnmatch(os.path.abspath(pth), exclude)
with tmpdir.as_cwd():
filenames = list(utils.filenames_from(".", predicate))
assert filenames == [os.path.join(".", "2", "1", "return_me.py")]
def test_parameters_for_class_plugin():
"""Verify that we can retrieve the parameters for a class plugin."""
@ -323,13 +247,6 @@ def test_parse_unified_diff(diff, parsed_diff):
assert utils.parse_unified_diff(diff) == parsed_diff
def test_matches_filename_for_excluding_dotfiles():
"""Verify that `.` and `..` are not matched by `.*`."""
logger = logging.Logger(__name__)
assert not utils.matches_filename(".", (".*",), "", logger)
assert not utils.matches_filename("..", (".*",), "", logger)
def test_stdin_get_value_crlf():
"""Ensure that stdin is normalized from crlf to lf."""
stdin = io.TextIOWrapper(io.BytesIO(b"1\r\n2\r\n"), "UTF-8")

View file

@ -122,4 +122,5 @@ commands =
extend-ignore = E203
per-file-ignores =
src/flake8/formatting/_windows_color.py: N806
tests/*: D
max-complexity = 10