Improve error handling and clarity in catch_dotenv hook and tests

This commit is contained in:
Chris Rowe 2025-08-28 20:29:12 -06:00
parent 33746f52ec
commit 989ac68f29
No known key found for this signature in database
3 changed files with 138 additions and 23 deletions

View file

@ -19,22 +19,30 @@ _KEY_REGEX = re.compile(r"^\s*(?:export\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*=")
def _atomic_write(path: str, data: str) -> None:
"""Atomic-ish text write: write to same-dir temp then os.replace."""
"""Atomically (best-effort) write text.
Writes to a same-directory temporary file then replaces the target with
os.replace(). This is a slight divergence from most existing hooks which
write directly, but here we intentionally reduce the (small) risk of
partially-written files because the hook may be invoked rapidly / in
parallel (tests exercise concurrent normalization). Keeping this helper
local avoids adding any dependency.
"""
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path) or ".")
try:
with os.fdopen(fd, "w", encoding="utf-8", newline="") as tmp_f:
tmp_f.write(data)
os.replace(tmp_path, path)
finally: # Clean up if replace failed
if os.path.exists(tmp_path): # pragma: no cover (rare failure case)
if os.path.exists(tmp_path): # (rare failure case)
try:
os.remove(tmp_path)
except OSError: # pragma: no cover
except OSError:
pass
def ensure_env_in_gitignore(env_file: str, gitignore_file: str, banner: str) -> bool:
"""Normalize .gitignore tail (banner + env) collapsing duplicates. Returns True if modified."""
def _read_gitignore(gitignore_file: str) -> tuple[str, list[str]]:
"""Read and parse .gitignore file content."""
try:
if os.path.exists(gitignore_file):
with open(gitignore_file, "r", encoding="utf-8") as f:
@ -45,14 +53,17 @@ def ensure_env_in_gitignore(env_file: str, gitignore_file: str, banner: str) ->
lines = []
except OSError as exc:
print(f"ERROR: unable to read {gitignore_file}: {exc}", file=sys.stderr)
return False
original_content_str = original_text if lines else "" # post-read snapshot
raise
return original_text if lines else "", lines
def _normalize_gitignore_lines(lines: list[str], env_file: str, banner: str) -> list[str]:
"""Normalize .gitignore lines by removing duplicates and adding canonical tail."""
# Trim trailing blank lines
while lines and not lines[-1].strip():
lines.pop()
# Remove existing occurrences (exact match after strip)
# Remove existing occurrences
filtered: list[str] = [ln for ln in lines if ln.strip() not in {env_file, banner}]
if filtered and filtered[-1].strip():
@ -62,14 +73,35 @@ def ensure_env_in_gitignore(env_file: str, gitignore_file: str, banner: str) ->
filtered.append(banner)
filtered.append(env_file)
return filtered
new_content = "\n".join(filtered) + "\n"
if new_content == (original_content_str if original_content_str.endswith("\n") else original_content_str + ("" if not original_content_str else "\n")):
def ensure_env_in_gitignore(env_file: str, gitignore_file: str, banner: str) -> bool:
"""Ensure canonical banner + env tail in .gitignore.
Returns True only when the file content was changed. Returns False both
when unchanged and on IO errors (we intentionally conflate for the simple
hook contract; errors are still surfaced via stderr output).
"""
try:
original_content_str, lines = _read_gitignore(gitignore_file)
except OSError:
return False
filtered = _normalize_gitignore_lines(lines, env_file, banner)
new_content = "\n".join(filtered) + "\n"
# Normalize original content to a single trailing newline for comparison
normalized_original = original_content_str
if normalized_original and not normalized_original.endswith("\n"):
normalized_original += "\n"
if new_content == normalized_original:
return False
try:
_atomic_write(gitignore_file, new_content)
return True
except OSError as exc: # pragma: no cover
except OSError as exc:
print(f"ERROR: unable to write {gitignore_file}: {exc}", file=sys.stderr)
return False
@ -107,7 +139,7 @@ def create_example_env(src_env: str, example_file: str) -> bool:
_atomic_write(example_file, "\n".join(header + body) + "\n")
return True
except OSError as exc: # pragma: no cover
print(f"ERROR: unable to write '{example_file}': {exc}")
print(f"ERROR: unable to write '{example_file}': {exc}", file=sys.stderr)
return False
@ -116,26 +148,25 @@ def _has_env(filenames: Iterable[str], env_file: str) -> bool:
return any(os.path.basename(name) == env_file for name in filenames)
def _print_failure(env_file: str, gitignore_file: str, example_created: bool, gitignore_modified: bool) -> None:
parts: list[str] = [f"Blocked committing {env_file}."]
# Match typical hook output style: one short line per action.
print(f"Blocked committing {env_file}.")
if gitignore_modified:
parts.append(f"Updated {gitignore_file}.")
print(f"Updated {gitignore_file}.")
if example_created:
parts.append("Generated .env.example.")
parts.append(f"Remove {env_file} from the commit and retry.")
print(" ".join(parts))
print("Generated .env.example.")
print(f"Remove {env_file} from the commit and retry.")
def main(argv: Sequence[str] | None = None) -> int:
"""Hook entry-point."""
parser = argparse.ArgumentParser(description="Block committing environment files (.env).")
parser = argparse.ArgumentParser(description="Blocks committing .env files.")
parser.add_argument('filenames', nargs='*', help='Staged filenames (supplied by pre-commit).')
parser.add_argument('--create-example', action='store_true', help='Generate example env file (.env.example).')
args = parser.parse_args(argv)
env_file = DEFAULT_ENV_FILE
# Use current working directory as repository root (simplified; no ascent)
# Use current working directory as repository root (pre-commit executes
# hooks from the repo root).
repo_root = os.getcwd()
gitignore_file = os.path.join(repo_root, DEFAULT_GITIGNORE_FILE)
example_file = os.path.join(repo_root, DEFAULT_EXAMPLE_ENV_FILE)