Refactor catch_dotenv hook and tests for improved error handling and clarity

This commit is contained in:
Chris Rowe 2025-08-28 20:09:17 -06:00
parent 25a3d2ea3f
commit 33746f52ec
No known key found for this signature in database
2 changed files with 31 additions and 54 deletions

View file

@ -4,12 +4,13 @@ from __future__ import annotations
import argparse import argparse
import os import os
import re import re
import sys
import tempfile import tempfile
from collections.abc import Sequence from collections.abc import Sequence
from typing import Iterable from typing import Iterable
# --- Defaults / Constants --- # Defaults / constants
DEFAULT_ENV_FILE = ".env" # Canonical env file name DEFAULT_ENV_FILE = ".env"
DEFAULT_GITIGNORE_FILE = ".gitignore" DEFAULT_GITIGNORE_FILE = ".gitignore"
DEFAULT_EXAMPLE_ENV_FILE = ".env.example" DEFAULT_EXAMPLE_ENV_FILE = ".env.example"
GITIGNORE_BANNER = "# Added by pre-commit hook to prevent committing secrets" GITIGNORE_BANNER = "# Added by pre-commit hook to prevent committing secrets"
@ -18,8 +19,7 @@ _KEY_REGEX = re.compile(r"^\s*(?:export\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*=")
def _atomic_write(path: str, data: str) -> None: def _atomic_write(path: str, data: str) -> None:
"""Write text to path atomically (best-effort).""" """Atomic-ish text write: write to same-dir temp then os.replace."""
# Using same directory for atomic os.replace semantics on POSIX.
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path) or ".") fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path) or ".")
try: try:
with os.fdopen(fd, "w", encoding="utf-8", newline="") as tmp_f: with os.fdopen(fd, "w", encoding="utf-8", newline="") as tmp_f:
@ -34,24 +34,19 @@ def _atomic_write(path: str, data: str) -> None:
def ensure_env_in_gitignore(env_file: str, gitignore_file: str, banner: str) -> bool: def ensure_env_in_gitignore(env_file: str, gitignore_file: str, banner: str) -> bool:
"""Normalize `.gitignore` so it contains exactly one banner + env line at end. """Normalize .gitignore tail (banner + env) collapsing duplicates. Returns True if modified."""
Returns True if the file was created or its contents changed, False otherwise.
Strategy: read existing lines, strip trailing blanks, remove any prior occurrences of
the banner or env_file (even if duplicated), then append a single blank line,
banner, and env_file. Produces an idempotent final layout.
"""
try: try:
if os.path.exists(gitignore_file): if os.path.exists(gitignore_file):
with open(gitignore_file, "r", encoding="utf-8") as f: with open(gitignore_file, "r", encoding="utf-8") as f:
lines = f.read().splitlines() original_text = f.read()
lines = original_text.splitlines()
else: else:
original_text = ""
lines = [] lines = []
except OSError as exc: except OSError as exc:
print(f"ERROR: unable to read '{gitignore_file}': {exc}") print(f"ERROR: unable to read {gitignore_file}: {exc}", file=sys.stderr)
return False return False
original_content_str = original_text if lines else "" # post-read snapshot
original = list(lines)
# Trim trailing blank lines # Trim trailing blank lines
while lines and not lines[-1].strip(): while lines and not lines[-1].strip():
@ -69,27 +64,23 @@ def ensure_env_in_gitignore(env_file: str, gitignore_file: str, banner: str) ->
filtered.append(env_file) filtered.append(env_file)
new_content = "\n".join(filtered) + "\n" new_content = "\n".join(filtered) + "\n"
if original == filtered: if new_content == (original_content_str if original_content_str.endswith("\n") else original_content_str + ("" if not original_content_str else "\n")):
return False return False
try: try:
_atomic_write(gitignore_file, new_content) _atomic_write(gitignore_file, new_content)
return True return True
except OSError as exc: # pragma: no cover except OSError as exc: # pragma: no cover
print(f"ERROR: unable to write '{gitignore_file}': {exc}") print(f"ERROR: unable to write {gitignore_file}: {exc}", file=sys.stderr)
return False return False
def create_example_env(src_env: str, example_file: str) -> bool: def create_example_env(src_env: str, example_file: str) -> bool:
"""Write example file containing only variable keys from real env file. """Generate .env.example with unique KEY= lines (no values)."""
Returns True if file written (or updated), False on read/write error.
Lines accepted: optional 'export ' prefix then KEY=...; ignores comments & duplicates.
"""
try: try:
with open(src_env, "r", encoding="utf-8") as f_env: with open(src_env, "r", encoding="utf-8") as f_env:
lines = f_env.readlines() lines = f_env.readlines()
except OSError as exc: except OSError as exc:
print(f"ERROR: unable to read '{src_env}': {exc}") print(f"ERROR: unable to read {src_env}: {exc}", file=sys.stderr)
return False return False
seen: set[str] = set() seen: set[str] = set()
@ -125,41 +116,27 @@ def _has_env(filenames: Iterable[str], env_file: str) -> bool:
return any(os.path.basename(name) == env_file for name in filenames) return any(os.path.basename(name) == env_file for name in filenames)
def _find_repo_root(start: str = '.') -> str:
"""Ascend from start until a directory containing '.git' is found.
Falls back to absolute path of start if no parent contains '.git'. This mirrors
typical pre-commit execution (already at repo root) but makes behavior stable
when hook is invoked from a subdirectory (e.g. for direct adhoc testing).
"""
cur = os.path.abspath(start)
prev = None
while cur != prev:
if os.path.isdir(os.path.join(cur, '.git')):
return cur
prev, cur = cur, os.path.abspath(os.path.join(cur, os.pardir))
return os.path.abspath(start)
def _print_failure(env_file: str, gitignore_file: str, example_created: bool, gitignore_modified: bool) -> None: def _print_failure(env_file: str, gitignore_file: str, example_created: bool, gitignore_modified: bool) -> None:
parts: list[str] = [f"Blocked committing '{env_file}'."] parts: list[str] = [f"Blocked committing {env_file}."]
if gitignore_modified: if gitignore_modified:
parts.append(f"Added to '{gitignore_file}'.") parts.append(f"Updated {gitignore_file}.")
if example_created: if example_created:
parts.append("Example file generated.") parts.append("Generated .env.example.")
parts.append(f"Remove '{env_file}' from the commit and commit again.") parts.append(f"Remove {env_file} from the commit and retry.")
print(" ".join(parts)) print(" ".join(parts))
def main(argv: Sequence[str] | None = None) -> int: def main(argv: Sequence[str] | None = None) -> int:
"""Main function for the pre-commit hook.""" """Hook entry-point."""
parser = argparse.ArgumentParser(description="Block committing environment files (.env).") parser = argparse.ArgumentParser(description="Block committing environment files (.env).")
parser.add_argument('filenames', nargs='*', help='Staged filenames (supplied by pre-commit).') parser.add_argument('filenames', nargs='*', help='Staged filenames (supplied by pre-commit).')
parser.add_argument('--create-example', action='store_true', help='Generate example env file (.env.example).') parser.add_argument('--create-example', action='store_true', help='Generate example env file (.env.example).')
args = parser.parse_args(argv) args = parser.parse_args(argv)
env_file = DEFAULT_ENV_FILE env_file = DEFAULT_ENV_FILE
# Resolve repository root (directory containing .git) so writes happen there # Use current working directory as repository root (simplified; no ascent)
repo_root = _find_repo_root('.') repo_root = os.getcwd()
gitignore_file = os.path.join(repo_root, DEFAULT_GITIGNORE_FILE) gitignore_file = os.path.join(repo_root, DEFAULT_GITIGNORE_FILE)
example_file = os.path.join(repo_root, DEFAULT_EXAMPLE_ENV_FILE) example_file = os.path.join(repo_root, DEFAULT_EXAMPLE_ENV_FILE)
env_abspath = os.path.join(repo_root, env_file) env_abspath = os.path.join(repo_root, env_file)

View file

@ -171,9 +171,9 @@ def test_failure_message_content(tmp_path: Path, env_file: Path, capsys):
assert ret == 1 assert ret == 1
out = capsys.readouterr().out.strip() out = capsys.readouterr().out.strip()
assert "Blocked committing" in out assert "Blocked committing" in out
assert DEFAULT_GITIGNORE_FILE in out assert DEFAULT_GITIGNORE_FILE in out # updated path appears
assert "Example file generated" in out assert "Generated .env.example." in out
assert "Remove '.env'" in out assert "Remove .env" in out
def test_create_example_when_env_missing(tmp_path: Path, env_file: Path): def test_create_example_when_env_missing(tmp_path: Path, env_file: Path):
@ -194,8 +194,8 @@ def test_gitignore_is_directory_error(tmp_path: Path, env_file: Path, capsys):
gitignore_dir.mkdir() gitignore_dir.mkdir()
ret = run_hook(tmp_path, [DEFAULT_ENV_FILE]) ret = run_hook(tmp_path, [DEFAULT_ENV_FILE])
assert ret == 1 # still blocks commit assert ret == 1 # still blocks commit
out = capsys.readouterr().out captured = capsys.readouterr()
assert "ERROR:" in out # read failure logged assert "ERROR:" in captured.err # error now printed to stderr
def test_env_example_overwrites_existing(tmp_path: Path, env_file: Path): def test_env_example_overwrites_existing(tmp_path: Path, env_file: Path):
@ -268,7 +268,7 @@ def test_already_ignored_env_with_variations(tmp_path: Path, env_file: Path):
def test_subdirectory_invocation(tmp_path: Path, env_file: Path): def test_subdirectory_invocation(tmp_path: Path, env_file: Path):
"""Running from a subdirectory still writes gitignore/example at repo root.""" """Running from a subdirectory now writes .gitignore relative to CWD (simplified behavior)."""
sub = tmp_path / 'subdir' sub = tmp_path / 'subdir'
sub.mkdir() sub.mkdir()
# simulate repository root marker # simulate repository root marker
@ -277,8 +277,8 @@ def test_subdirectory_invocation(tmp_path: Path, env_file: Path):
cwd = os.getcwd() cwd = os.getcwd()
os.chdir(sub) os.chdir(sub)
try: try:
ret = main(['../' + DEFAULT_ENV_FILE]) ret = main(['../' + DEFAULT_ENV_FILE]) # staged path relative to subdir
gi = (tmp_path / DEFAULT_GITIGNORE_FILE).read_text().splitlines() gi = (sub / DEFAULT_GITIGNORE_FILE).read_text().splitlines()
finally: finally:
os.chdir(cwd) os.chdir(cwd)
assert ret == 1 assert ret == 1
@ -292,8 +292,8 @@ def test_atomic_write_failure_gitignore(monkeypatch, tmp_path: Path, env_file: P
monkeypatch.setattr('pre_commit_hooks.catch_dotenv.os.replace', boom) monkeypatch.setattr('pre_commit_hooks.catch_dotenv.os.replace', boom)
modified = ensure_env_in_gitignore(DEFAULT_ENV_FILE, str(tmp_path / DEFAULT_GITIGNORE_FILE), GITIGNORE_BANNER) modified = ensure_env_in_gitignore(DEFAULT_ENV_FILE, str(tmp_path / DEFAULT_GITIGNORE_FILE), GITIGNORE_BANNER)
assert modified is False assert modified is False
out = capsys.readouterr().out captured = capsys.readouterr()
assert 'ERROR: unable to write' in out assert 'ERROR: unable to write' in captured.err
def test_atomic_write_failure_example(monkeypatch, tmp_path: Path, env_file: Path, capsys): def test_atomic_write_failure_example(monkeypatch, tmp_path: Path, env_file: Path, capsys):