mirror of
https://github.com/LearningCircuit/local-deep-research.git
synced 2026-06-15 19:46:56 +03:00
chore(hooks): require UtcDateTime in migrations too (#3523)
Tighten check-datetime-timezone so the UtcDateTime rule applies to both models and migrations. Supersedes the inverted approach in #3515, which tried to accept sa.DateTime(timezone=True) inside migrations. - Rewrite the AST walker: handle sa.Column / bare Column, positional type arg at any index, bare Column(UtcDateTime) without parens (the hook's own example), and ast.IfExp with both branches inspected independently so a violation in either arm is still flagged. - Anchor the path filter on src/local_deep_research/ to stop false-positives on tests/database/models/ and partial-name matches like database/models_backup/. - Update .pre-commit-config.yaml name/description and the stale CI_CD_INFRASTRUCTURE.md hook table entry. - Add tests/hooks/test_check_datetime_timezone.py with 20 cases: violations (models / migrations / conditional types / batch runs / bare names), allows (UtcDateTime with import, combo import order, empty / syntax-error files), and path-filter boundaries.
This commit is contained in:
@@ -111,11 +111,11 @@ repos:
|
||||
description: "Prevent usage of redundant get_setting_from_db_main_thread wrapper
|
||||
- use SettingsManager directly"
|
||||
- id: check-datetime-timezone
|
||||
name: Check DateTime columns have timezone
|
||||
name: Check DateTime columns use UtcDateTime
|
||||
entry: scripts/pre_commit/check_datetime_timezone.py
|
||||
language: script
|
||||
files: \.py$
|
||||
description: "Ensure all SQLAlchemy DateTime columns have timezone=True"
|
||||
description: "All DateTime columns (models and migrations) must use UtcDateTime from sqlalchemy_utc"
|
||||
- id: check-session-context-manager
|
||||
name: Check for try/finally session patterns
|
||||
entry: .pre-commit-hooks/check-session-context-manager.py
|
||||
|
||||
@@ -68,7 +68,7 @@ pre-commit install-hooks
|
||||
| `check-deprecated-db-connection` | Enforce per-user database connections |
|
||||
| `check-ldr-db-usage` | Prevent shared `ldr.db` usage |
|
||||
| `check-research-id-type` | `research_id` must be string/UUID, not int |
|
||||
| `check-datetime-timezone` | SQLAlchemy DateTime must have `timezone=True` |
|
||||
| `check-datetime-timezone` | All DateTime columns (models and migrations) must use `UtcDateTime` from `sqlalchemy_utc` |
|
||||
| `check-session-context-manager` | Require context managers for DB sessions |
|
||||
| `check-pathlib-usage` | Use `pathlib.Path` instead of `os.path` |
|
||||
| `check-no-external-resources` | No external CDN/resource references |
|
||||
|
||||
@@ -1,5 +1,23 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Pre-commit hook to ensure all datetime columns use UtcDateTime for SQLite compatibility."""
|
||||
"""Pre-commit hook: ensure DateTime columns in models and migrations use UtcDateTime.
|
||||
|
||||
Scans files under ``src/local_deep_research/database/models/`` and
|
||||
``src/local_deep_research/database/migrations/versions/`` for
|
||||
``Column(...)`` / ``sa.Column(...)`` calls whose type argument is a bare
|
||||
``DateTime`` (with or without ``timezone=True``). Flags them and hints at
|
||||
the ``UtcDateTime`` replacement from ``sqlalchemy_utc``.
|
||||
|
||||
Limitations (accepted gaps, not caught by this hook):
|
||||
- Raw SQL inside ``op.execute("... DATETIME ...")`` — the hook cannot
|
||||
parse SQL strings.
|
||||
- Type-alias indirection: ``dt = sa.DateTime(); sa.Column("x", dt)``.
|
||||
- Fully-qualified imports without the ``sa`` alias
|
||||
(e.g. ``import sqlalchemy; sqlalchemy.Column(...)``).
|
||||
- ``sa.TIMESTAMP`` columns.
|
||||
- Walrus expressions: ``Column((dt := DateTime()))`` wraps the call in
|
||||
``ast.NamedExpr``, which the helper does not traverse.
|
||||
- Import-order variations beyond the two hardcoded substring forms.
|
||||
"""
|
||||
|
||||
import ast
|
||||
import re
|
||||
@@ -8,6 +26,33 @@ from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
|
||||
def _callable_name(func_node):
|
||||
"""Return the callable's short name regardless of ``X`` or ``sa.X`` form."""
|
||||
if isinstance(func_node, ast.Name):
|
||||
return func_node.id
|
||||
if isinstance(func_node, ast.Attribute):
|
||||
return func_node.attr
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_type_arg(arg):
|
||||
"""Return list of ('call', Call) or ('name', str) entries for all
|
||||
type-like nodes in arg's subtree. Returns [] when arg is not a type
|
||||
reference.
|
||||
|
||||
For ast.IfExp, BOTH branches are included — returning only the
|
||||
first-resolved branch would silently pass a violation that lives
|
||||
in the other branch.
|
||||
"""
|
||||
if isinstance(arg, ast.Call):
|
||||
return [("call", arg)]
|
||||
if isinstance(arg, ast.Name) and arg.id in {"UtcDateTime", "DateTime"}:
|
||||
return [("name", arg.id)]
|
||||
if isinstance(arg, ast.IfExp):
|
||||
return _resolve_type_arg(arg.body) + _resolve_type_arg(arg.orelse)
|
||||
return []
|
||||
|
||||
|
||||
def check_datetime_columns(file_path: Path) -> List[Tuple[int, str, str]]:
|
||||
"""Check a Python file for DateTime columns that should use UtcDateTime.
|
||||
|
||||
@@ -23,57 +68,71 @@ def check_datetime_columns(file_path: Path) -> List[Tuple[int, str, str]]:
|
||||
print(f"Error reading {file_path}: {e}", file=sys.stderr)
|
||||
return violations
|
||||
|
||||
# Check if file imports UtcDateTime (if it uses any DateTime columns)
|
||||
has_utc_datetime_import = (
|
||||
"from sqlalchemy_utc import UtcDateTime" in content
|
||||
or "from sqlalchemy_utc import utcnow, UtcDateTime" in content
|
||||
)
|
||||
|
||||
# Parse the AST to find Column definitions with DateTime
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
except SyntaxError:
|
||||
# Not valid Python, skip
|
||||
return violations
|
||||
|
||||
fix_hint = (
|
||||
"Use UtcDateTime() instead of DateTime() — "
|
||||
"import: from sqlalchemy_utc import UtcDateTime"
|
||||
)
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Call):
|
||||
# Check if this is a Column call
|
||||
if isinstance(node.func, ast.Name) and node.func.id == "Column":
|
||||
# Check if first argument is DateTime
|
||||
if node.args and isinstance(node.args[0], ast.Call):
|
||||
datetime_call = node.args[0]
|
||||
if (
|
||||
isinstance(datetime_call.func, ast.Name)
|
||||
and datetime_call.func.id == "DateTime"
|
||||
):
|
||||
# This should be UtcDateTime instead
|
||||
if not isinstance(node, ast.Call):
|
||||
continue
|
||||
if _callable_name(node.func) != "Column":
|
||||
continue
|
||||
|
||||
type_entries = []
|
||||
for arg in node.args:
|
||||
type_entries = _resolve_type_arg(arg)
|
||||
if type_entries:
|
||||
break
|
||||
|
||||
for kind, payload in type_entries:
|
||||
if kind == "call":
|
||||
inner_name = _callable_name(payload.func)
|
||||
if inner_name == "DateTime":
|
||||
line_num = node.lineno
|
||||
if 0 <= line_num - 1 < len(lines):
|
||||
violations.append(
|
||||
(line_num, lines[line_num - 1].strip(), fix_hint)
|
||||
)
|
||||
elif inner_name == "UtcDateTime":
|
||||
if not has_utc_datetime_import:
|
||||
line_num = node.lineno
|
||||
if 0 <= line_num - 1 < len(lines):
|
||||
violations.append(
|
||||
(
|
||||
line_num,
|
||||
lines[line_num - 1].strip(),
|
||||
"Use UtcDateTime instead of DateTime for SQLite compatibility",
|
||||
"Missing import: from sqlalchemy_utc import UtcDateTime",
|
||||
)
|
||||
)
|
||||
elif (
|
||||
isinstance(datetime_call.func, ast.Name)
|
||||
and datetime_call.func.id == "UtcDateTime"
|
||||
):
|
||||
# This is correct, but check if import exists
|
||||
if not has_utc_datetime_import:
|
||||
line_num = node.lineno
|
||||
if 0 <= line_num - 1 < len(lines):
|
||||
violations.append(
|
||||
(
|
||||
line_num,
|
||||
lines[line_num - 1].strip(),
|
||||
"Missing import: from sqlalchemy_utc import UtcDateTime",
|
||||
)
|
||||
)
|
||||
elif kind == "name":
|
||||
if payload == "DateTime":
|
||||
line_num = node.lineno
|
||||
if 0 <= line_num - 1 < len(lines):
|
||||
violations.append(
|
||||
(line_num, lines[line_num - 1].strip(), fix_hint)
|
||||
)
|
||||
elif payload == "UtcDateTime" and not has_utc_datetime_import:
|
||||
line_num = node.lineno
|
||||
if 0 <= line_num - 1 < len(lines):
|
||||
violations.append(
|
||||
(
|
||||
line_num,
|
||||
lines[line_num - 1].strip(),
|
||||
"Missing import: from sqlalchemy_utc import UtcDateTime",
|
||||
)
|
||||
)
|
||||
|
||||
# Also check for func.now() usage which should be utcnow()
|
||||
for i, line in enumerate(lines, 1):
|
||||
if "func.now()" in line and "Column" in line:
|
||||
violations.append(
|
||||
@@ -83,7 +142,6 @@ def check_datetime_columns(file_path: Path) -> List[Tuple[int, str, str]]:
|
||||
"Use utcnow() instead of func.now() for timezone-aware defaults",
|
||||
)
|
||||
)
|
||||
# Check for datetime.utcnow or datetime.now(UTC) in defaults
|
||||
if re.search(
|
||||
r"default\s*=\s*(lambda:\s*)?datetime\.(utcnow|now)", line
|
||||
):
|
||||
@@ -111,23 +169,31 @@ def main():
|
||||
for file_path_str in files_to_check:
|
||||
file_path = Path(file_path_str)
|
||||
|
||||
# Only check Python files in database/models directories
|
||||
if file_path.suffix == ".py" and (
|
||||
"database/models" in str(file_path) or "models" in file_path.parts
|
||||
):
|
||||
violations = check_datetime_columns(file_path)
|
||||
if violations:
|
||||
all_violations.append((file_path, violations))
|
||||
path_str = str(file_path)
|
||||
in_scope = file_path.suffix == ".py" and (
|
||||
"src/local_deep_research/database/models/" in path_str
|
||||
or "src/local_deep_research/database/migrations/versions/"
|
||||
in path_str
|
||||
)
|
||||
if not in_scope:
|
||||
continue
|
||||
|
||||
violations = check_datetime_columns(file_path)
|
||||
if violations:
|
||||
all_violations.append((file_path, violations))
|
||||
|
||||
if all_violations:
|
||||
print("\n❌ DateTime column issues found:\n")
|
||||
print("\nDateTime column issues found:\n")
|
||||
for file_path, violations in all_violations:
|
||||
print(f" {file_path}:")
|
||||
for line_num, line_content, error_msg in violations:
|
||||
print(f" Line {line_num}: {error_msg}")
|
||||
print(f" > {line_content}")
|
||||
print(
|
||||
"\n Fix: Use UtcDateTime from sqlalchemy_utc for all datetime columns"
|
||||
"\n Fix: use UtcDateTime from sqlalchemy_utc for all datetime columns"
|
||||
)
|
||||
print(
|
||||
" (applies to both database/models/ and database/migrations/versions/)"
|
||||
)
|
||||
print(" Example: ")
|
||||
print(" from sqlalchemy_utc import UtcDateTime, utcnow")
|
||||
|
||||
306
tests/hooks/test_check_datetime_timezone.py
Normal file
306
tests/hooks/test_check_datetime_timezone.py
Normal file
@@ -0,0 +1,306 @@
|
||||
"""
|
||||
Tests for the check-datetime-timezone pre-commit hook.
|
||||
|
||||
Verifies that the hook flags any ``Column(DateTime(...))`` /
|
||||
``sa.Column(..., DateTime(...))`` pattern in both model files under
|
||||
``src/local_deep_research/database/models/`` and migration files under
|
||||
``src/local_deep_research/database/migrations/versions/``, and allows
|
||||
``UtcDateTime`` when the ``sqlalchemy_utc`` import is present.
|
||||
|
||||
Notable regressions locked in here:
|
||||
|
||||
* No migration exemption for ``sa.DateTime(timezone=True)``
|
||||
(see ``test_no_migration_exemption_for_timezone_true``). This is the
|
||||
inversion of the superseded PR #3515 approach.
|
||||
* ``ast.IfExp`` branches are BOTH inspected — a violation in either
|
||||
``body`` or ``orelse`` is caught.
|
||||
* Path filter anchors on ``src/local_deep_research/`` so
|
||||
``tests/database/models/`` is NOT scanned.
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
HOOK_SCRIPT = (
|
||||
Path(__file__).parent.parent.parent
|
||||
/ "scripts"
|
||||
/ "pre_commit"
|
||||
/ "check_datetime_timezone.py"
|
||||
)
|
||||
|
||||
|
||||
PACKAGE_PREFIX = ("src", "local_deep_research")
|
||||
|
||||
|
||||
def _run_hook_on_source(
|
||||
source: str,
|
||||
subpath: tuple = PACKAGE_PREFIX + ("database", "models"),
|
||||
filename: str = "example.py",
|
||||
) -> subprocess.CompletedProcess:
|
||||
"""Write *source* into a directory tree matching *subpath* and run the hook."""
|
||||
with tempfile.TemporaryDirectory() as base:
|
||||
nested = Path(base).joinpath(*subpath)
|
||||
nested.mkdir(parents=True)
|
||||
target = nested / filename
|
||||
target.write_text(source)
|
||||
return subprocess.run(
|
||||
[sys.executable, str(HOOK_SCRIPT), str(target)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
|
||||
def _run_hook_on_files(
|
||||
files: list,
|
||||
) -> subprocess.CompletedProcess:
|
||||
"""Run the hook against multiple prepared files.
|
||||
|
||||
``files`` is a list of ``(subpath_tuple, filename, source)`` triples.
|
||||
Returns after running the hook once with all file paths as arguments.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as base:
|
||||
paths = []
|
||||
for subpath, filename, source in files:
|
||||
nested = Path(base).joinpath(*subpath)
|
||||
nested.mkdir(parents=True, exist_ok=True)
|
||||
target = nested / filename
|
||||
target.write_text(source)
|
||||
paths.append(str(target))
|
||||
return subprocess.run(
|
||||
[sys.executable, str(HOOK_SCRIPT), *paths],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
|
||||
MODEL_PATH = PACKAGE_PREFIX + ("database", "models")
|
||||
MIGRATION_PATH = PACKAGE_PREFIX + ("database", "migrations", "versions")
|
||||
OUT_OF_SCOPE_PATH = ("tests", "database", "models")
|
||||
|
||||
|
||||
IMPORT_LINE = "from sqlalchemy_utc import UtcDateTime\n"
|
||||
IMPORT_LINE_COMBO = "from sqlalchemy_utc import utcnow, UtcDateTime\n"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Flagged patterns
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFlagsViolations:
|
||||
"""Patterns that must exit with returncode 1."""
|
||||
|
||||
def test_model_bare_datetime_call_flagged(self):
|
||||
result = _run_hook_on_source(
|
||||
"ts = Column(DateTime(), nullable=False)\n",
|
||||
subpath=MODEL_PATH,
|
||||
)
|
||||
assert result.returncode == 1
|
||||
assert "UtcDateTime" in result.stdout
|
||||
|
||||
def test_model_bare_datetime_name_flagged(self):
|
||||
"""Bare ``Column(DateTime)`` without parens — new stricter handling."""
|
||||
result = _run_hook_on_source(
|
||||
"ts = Column(DateTime)\n",
|
||||
subpath=MODEL_PATH,
|
||||
)
|
||||
assert result.returncode == 1
|
||||
|
||||
def test_model_utcdatetime_missing_import_flagged(self):
|
||||
result = _run_hook_on_source(
|
||||
"ts = Column(UtcDateTime, nullable=False)\n",
|
||||
subpath=MODEL_PATH,
|
||||
)
|
||||
assert result.returncode == 1
|
||||
assert "Missing import" in result.stdout
|
||||
|
||||
def test_no_migration_exemption_for_timezone_true(self):
|
||||
"""PR #3515's migration exemption is intentionally removed.
|
||||
|
||||
A migration that uses ``sa.DateTime(timezone=True)`` is a
|
||||
violation under the ``UtcDateTime``-everywhere rule, even though
|
||||
the old hook behaviour accepted it.
|
||||
"""
|
||||
result = _run_hook_on_source(
|
||||
'created_at = sa.Column("created_at", sa.DateTime(timezone=True), nullable=False)\n',
|
||||
subpath=MIGRATION_PATH,
|
||||
filename="0006_example.py",
|
||||
)
|
||||
assert result.returncode == 1
|
||||
assert "UtcDateTime" in result.stdout
|
||||
|
||||
def test_migration_bare_datetime_flagged(self):
|
||||
result = _run_hook_on_source(
|
||||
'ts = sa.Column("ts", sa.DateTime())\n',
|
||||
subpath=MIGRATION_PATH,
|
||||
filename="0006_example.py",
|
||||
)
|
||||
assert result.returncode == 1
|
||||
|
||||
def test_conditional_type_violation_in_body(self):
|
||||
"""``Column(DateTime() if cond else UtcDateTime())`` — body branch."""
|
||||
source = IMPORT_LINE + (
|
||||
"ts = Column(DateTime() if flag else UtcDateTime())\n"
|
||||
)
|
||||
result = _run_hook_on_source(source, subpath=MODEL_PATH)
|
||||
assert result.returncode == 1
|
||||
|
||||
def test_conditional_type_violation_in_orelse(self):
|
||||
"""``Column(UtcDateTime() if cond else DateTime())`` — orelse branch.
|
||||
|
||||
Regression guard for the asymmetric-branch bug: if the helper
|
||||
short-circuits on the first resolved branch, this case slips
|
||||
through silently.
|
||||
"""
|
||||
source = IMPORT_LINE + (
|
||||
"ts = Column(UtcDateTime() if flag else DateTime())\n"
|
||||
)
|
||||
result = _run_hook_on_source(source, subpath=MODEL_PATH)
|
||||
assert result.returncode == 1
|
||||
|
||||
def test_two_violations_in_one_file_both_reported(self):
|
||||
source = (
|
||||
"a = Column(DateTime(), nullable=False)\n"
|
||||
"b = Column(DateTime(), nullable=False)\n"
|
||||
)
|
||||
result = _run_hook_on_source(source, subpath=MODEL_PATH)
|
||||
assert result.returncode == 1
|
||||
assert "Line 1" in result.stdout
|
||||
assert "Line 2" in result.stdout
|
||||
|
||||
def test_batch_only_dirty_file_reported(self):
|
||||
dirty = "ts = Column(DateTime(), nullable=False)\n"
|
||||
clean = IMPORT_LINE + "ts = Column(UtcDateTime, nullable=False)\n"
|
||||
result = _run_hook_on_files(
|
||||
[
|
||||
(MODEL_PATH, "dirty.py", dirty),
|
||||
(MODEL_PATH, "clean.py", clean),
|
||||
]
|
||||
)
|
||||
assert result.returncode == 1
|
||||
assert "dirty.py" in result.stdout
|
||||
assert "clean.py" not in result.stdout
|
||||
|
||||
def test_path_filter_scans_src_dir(self):
|
||||
"""Positive side of the path-filter boundary."""
|
||||
result = _run_hook_on_source(
|
||||
"ts = Column(DateTime(), nullable=False)\n",
|
||||
subpath=MODEL_PATH,
|
||||
)
|
||||
assert result.returncode == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Allowed patterns
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAllowsCorrectPatterns:
|
||||
"""Patterns that must exit with returncode 0."""
|
||||
|
||||
def test_model_utcdatetime_bare_with_import(self):
|
||||
source = IMPORT_LINE + (
|
||||
"ts = Column(UtcDateTime, default=utcnow(), nullable=False)\n"
|
||||
)
|
||||
result = _run_hook_on_source(source, subpath=MODEL_PATH)
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_migration_utcdatetime_call_with_import(self):
|
||||
source = IMPORT_LINE + (
|
||||
'ts = sa.Column("ts", UtcDateTime(), nullable=False)\n'
|
||||
)
|
||||
result = _run_hook_on_source(
|
||||
source, subpath=MIGRATION_PATH, filename="0006_example.py"
|
||||
)
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_combo_import_order_detected(self):
|
||||
"""``from sqlalchemy_utc import utcnow, UtcDateTime`` is accepted."""
|
||||
source = IMPORT_LINE_COMBO + (
|
||||
"ts = Column(UtcDateTime, default=utcnow(), nullable=False)\n"
|
||||
)
|
||||
result = _run_hook_on_source(source, subpath=MODEL_PATH)
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_empty_file(self):
|
||||
result = _run_hook_on_source("", subpath=MODEL_PATH)
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_syntax_error_file_does_not_crash(self):
|
||||
"""Hook's ast.parse swallows SyntaxError and returns 0 for that file."""
|
||||
result = _run_hook_on_source("def foo(:\n", subpath=MODEL_PATH)
|
||||
assert result.returncode == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Path filter
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPathFilter:
|
||||
"""The hook must only scan files under the package prefix."""
|
||||
|
||||
def test_file_outside_both_dirs_is_skipped(self):
|
||||
"""Arbitrary path — hook does not scan."""
|
||||
with tempfile.TemporaryDirectory() as base:
|
||||
target = Path(base) / "random" / "file.py"
|
||||
target.parent.mkdir(parents=True)
|
||||
target.write_text("ts = Column(DateTime(), nullable=False)\n")
|
||||
result = subprocess.run(
|
||||
[sys.executable, str(HOOK_SCRIPT), str(target)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_tests_dir_with_database_models_segment_is_skipped(self):
|
||||
"""``tests/database/models/foo.py`` must NOT be scanned.
|
||||
|
||||
Prevents the pre-existing substring fallback from false-firing
|
||||
on test fixtures that happen to declare schema-like code.
|
||||
"""
|
||||
result = _run_hook_on_source(
|
||||
"ts = Column(DateTime(), nullable=False)\n",
|
||||
subpath=OUT_OF_SCOPE_PATH,
|
||||
)
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_models_backup_substring_boundary(self):
|
||||
"""``database/models_backup/`` must NOT match."""
|
||||
result = _run_hook_on_source(
|
||||
"ts = Column(DateTime(), nullable=False)\n",
|
||||
subpath=PACKAGE_PREFIX + ("database", "models_backup"),
|
||||
)
|
||||
assert result.returncode == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Output format
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestOutputFormat:
|
||||
"""Lock in the user-facing output so refactors don't silently change it."""
|
||||
|
||||
def test_violation_stdout_includes_filename(self):
|
||||
result = _run_hook_on_source(
|
||||
"ts = Column(DateTime(), nullable=False)\n",
|
||||
subpath=MODEL_PATH,
|
||||
filename="my_model.py",
|
||||
)
|
||||
assert result.returncode == 1
|
||||
assert "my_model.py" in result.stdout
|
||||
|
||||
def test_violation_stdout_includes_fix_hint(self):
|
||||
result = _run_hook_on_source(
|
||||
"ts = Column(DateTime(), nullable=False)\n",
|
||||
subpath=MODEL_PATH,
|
||||
)
|
||||
assert "UtcDateTime" in result.stdout
|
||||
assert "sqlalchemy_utc" in result.stdout
|
||||
Reference in New Issue
Block a user