add __import__("pdb") handling

This commit is contained in:
Alessio Izzo 2023-07-05 23:50:20 +02:00
parent 950e8063e1
commit 9b67aa469d
No known key found for this signature in database
GPG key ID: 6EDA81DCD05A689B
2 changed files with 58 additions and 36 deletions

View file

@ -8,15 +8,15 @@ from typing import Sequence
DEBUG_STATEMENTS = { DEBUG_STATEMENTS = {
'ipdb', "ipdb",
'pdb', "pdb",
'pdbr', "pdbr",
'pudb', "pudb",
'pydevd_pycharm', "pydevd_pycharm",
'q', "q",
'rdb', "rdb",
'rpdb', "rpdb",
'wdb', "wdb",
} }
@ -34,30 +34,39 @@ class DebugStatementParser(ast.NodeVisitor):
def visit_Import(self, node: ast.Import) -> None: def visit_Import(self, node: ast.Import) -> None:
for name in node.names: for name in node.names:
if name.name in DEBUG_STATEMENTS: if name.name in DEBUG_STATEMENTS:
st = Debug(node.lineno, node.col_offset, name.name, 'imported') st = Debug(node.lineno, node.col_offset, name.name, "imported")
self.breakpoints.append(st) self.breakpoints.append(st)
def visit_ImportFrom(self, node: ast.ImportFrom) -> None: def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
if node.module in DEBUG_STATEMENTS: if node.module in DEBUG_STATEMENTS:
st = Debug(node.lineno, node.col_offset, node.module, 'imported') st = Debug(node.lineno, node.col_offset, node.module, "imported")
self.breakpoints.append(st) self.breakpoints.append(st)
def visit_Call(self, node: ast.Call) -> None: def visit_Call(self, node: ast.Call) -> None:
"""python3.7+ breakpoint()""" """python3.7+ breakpoint() and __import__"""
if isinstance(node.func, ast.Name) and node.func.id == 'breakpoint': if isinstance(node.func, ast.Name):
st = Debug(node.lineno, node.col_offset, node.func.id, 'called') if node.func.id == "breakpoint":
self.breakpoints.append(st) st = Debug(node.lineno, node.col_offset, node.func.id, "called")
self.breakpoints.append(st)
if (
node.func.id == "__import__"
and isinstance(node.args[0], ast.Constant)
and len(node.args) > 0
and node.args[0].value in DEBUG_STATEMENTS
):
st = Debug(node.lineno, node.col_offset, node.args[0].value, "imported")
self.breakpoints.append(st)
self.generic_visit(node) self.generic_visit(node)
def check_file(filename: str) -> int: def check_file(filename: str) -> int:
try: try:
with open(filename, 'rb') as f: with open(filename, "rb") as f:
ast_obj = ast.parse(f.read(), filename=filename) ast_obj = ast.parse(f.read(), filename=filename)
except SyntaxError: except SyntaxError:
print(f'{filename} - Could not parse ast') print(f"{filename} - Could not parse ast")
print() print()
print('\t' + traceback.format_exc().replace('\n', '\n\t')) print("\t" + traceback.format_exc().replace("\n", "\n\t"))
print() print()
return 1 return 1
@ -65,14 +74,14 @@ def check_file(filename: str) -> int:
visitor.visit(ast_obj) visitor.visit(ast_obj)
for bp in visitor.breakpoints: for bp in visitor.breakpoints:
print(f'{filename}:{bp.line}:{bp.col}: {bp.name} {bp.reason}') print(f"{filename}:{bp.line}:{bp.col}: {bp.name} {bp.reason}")
return int(bool(visitor.breakpoints)) return int(bool(visitor.breakpoints))
def main(argv: Sequence[str] | None = None) -> int: def main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('filenames', nargs='*', help='Filenames to run') parser.add_argument("filenames", nargs="*", help="Filenames to run")
args = parser.parse_args(argv) args = parser.parse_args(argv)
retv = 0 retv = 0
@ -81,5 +90,5 @@ def main(argv: Sequence[str] | None = None) -> int:
return retv return retv
if __name__ == '__main__': if __name__ == "__main__":
raise SystemExit(main()) raise SystemExit(main())

View file

@ -10,31 +10,44 @@ from testing.util import get_resource_path
def test_no_breakpoints(): def test_no_breakpoints():
visitor = DebugStatementParser() visitor = DebugStatementParser()
visitor.visit(ast.parse('import os\nfrom foo import bar\n')) visitor.visit(ast.parse("import os\nfrom foo import bar\n"))
assert visitor.breakpoints == [] assert visitor.breakpoints == []
def test_finds_debug_import_attribute_access(): def test_finds_debug_import_attribute_access():
visitor = DebugStatementParser() visitor = DebugStatementParser()
visitor.visit(ast.parse('import ipdb; ipdb.set_trace()')) visitor.visit(ast.parse("import ipdb; ipdb.set_trace()"))
assert visitor.breakpoints == [Debug(1, 0, 'ipdb', 'imported')] assert visitor.breakpoints == [Debug(1, 0, "ipdb", "imported")]
def test_finds_debug_import_from_import(): def test_finds_debug_import_from_import():
visitor = DebugStatementParser() visitor = DebugStatementParser()
visitor.visit(ast.parse('from pudb import set_trace; set_trace()')) visitor.visit(ast.parse("from pudb import set_trace; set_trace()"))
assert visitor.breakpoints == [Debug(1, 0, 'pudb', 'imported')] assert visitor.breakpoints == [Debug(1, 0, "pudb", "imported")]
def test_finds_debug_import_from_dunder_import():
visitor = DebugStatementParser()
visitor.visit(ast.parse("__import__('pdb').set_trace()"))
assert visitor.breakpoints == [Debug(1, 0, "pdb", "imported")]
def test_finds_breakpoint(): def test_finds_breakpoint():
visitor = DebugStatementParser() visitor = DebugStatementParser()
visitor.visit(ast.parse('breakpoint()')) visitor.visit(ast.parse("breakpoint()"))
assert visitor.breakpoints == [Debug(1, 0, 'breakpoint', 'called')] assert visitor.breakpoints == [Debug(1, 0, "breakpoint", "called")]
def test_returns_one_for_failing_file(tmpdir): def test_returns_one_for_failing_file(tmpdir):
f_py = tmpdir.join('f.py') f_py = tmpdir.join("f.py")
f_py.write('def f():\n import pdb; pdb.set_trace()') f_py.write("def f():\n import pdb; pdb.set_trace()")
ret = main([str(f_py)])
assert ret == 1
def test_returns_one_for_failing_file_inline_dunder_import(tmpdir):
f_py = tmpdir.join("f.py")
f_py.write('def f():\n __import__("pdb").set_trace()')
ret = main([str(f_py)]) ret = main([str(f_py)])
assert ret == 1 assert ret == 1
@ -45,19 +58,19 @@ def test_returns_zero_for_passing_file():
def test_syntaxerror_file(): def test_syntaxerror_file():
ret = main([get_resource_path('cannot_parse_ast.notpy')]) ret = main([get_resource_path("cannot_parse_ast.notpy")])
assert ret == 1 assert ret == 1
def test_non_utf8_file(tmpdir): def test_non_utf8_file(tmpdir):
f_py = tmpdir.join('f.py') f_py = tmpdir.join("f.py")
f_py.write_binary('# -*- coding: cp1252 -*-\nx = ""\n'.encode('cp1252')) f_py.write_binary('# -*- coding: cp1252 -*-\nx = ""\n'.encode("cp1252"))
assert main((str(f_py),)) == 0 assert main((str(f_py),)) == 0
def test_py37_breakpoint(tmpdir, capsys): def test_py37_breakpoint(tmpdir, capsys):
f_py = tmpdir.join('f.py') f_py = tmpdir.join("f.py")
f_py.write('def f():\n breakpoint()\n') f_py.write("def f():\n breakpoint()\n")
assert main((str(f_py),)) == 1 assert main((str(f_py),)) == 1
out, _ = capsys.readouterr() out, _ = capsys.readouterr()
assert out == f'{f_py}:2:4: breakpoint called\n' assert out == f"{f_py}:2:4: breakpoint called\n"