"""REC-235 Phase 3 — per-atom regeneration (atom_regen) milestone + guards.

Phase 2 shipped the import/alias smoke case (below). Phase 3 ADDS to THIS SAME file
(one file, ``-k``-scoped):
  - Case A — append-only new take (neighbors untouched, canon NOT moved);
  - Case B — the injected generator is the SOLE generation path (runtime poison);
  - Case B2 — an UNRECEIPTED workflow is rejected, beat unchanged (contract (a));
  - Case B3 — a MISSING / UNSAFE receipted artifact is rejected, beat unchanged;
  - Case C (THE MILESTONE) — reshoot ONE beat -> neighbors untouched -> swap canon ->
    re-render the cut, all while EVERY generation/dispatch entrypoint is poisoned;
  - Case D — two SEQUENTIAL regens get DISTINCT indices (@t1 then @t2, no clobber);
  - Case E — STATIC AST/source gate: atom_regen.py imports NEITHER dispatch NOR
    VideoRunner and has no .run( / .execute( / dispatch( generation-execution call.

Generation is ALWAYS injected (an already-receipted fake Workflow per execution
contract (a)). NO real generation, NO network, NO VideoRunner, NO dispatch.
"""
import sys
import pathlib

sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent.parent.parent))
from recoil.core.paths import ensure_pipeline_importable  # noqa: E402
ensure_pipeline_importable()

from recoil.pipeline.core.atom_regen import (  # noqa: E402
    approve,
    regenerate_atom,
    set_canon,
)

import ast  # noqa: E402
import dataclasses  # noqa: E402
import importlib  # noqa: E402
import uuid  # noqa: E402
from pathlib import Path  # noqa: E402

import pytest  # noqa: E402
from PIL import Image  # noqa: E402

import recoil.pipeline.core.atom_regen as atom_regen  # noqa: E402

# The `recoil.pipeline.core` package re-exports the `dispatch` FUNCTION, which shadows the
# submodule attribute — so `import recoil.pipeline.core.dispatch as x` would bind the
# function, not the module. Resolve the real module via importlib so monkeypatch targets
# the canonical generation entrypoint `recoil.pipeline.core.dispatch.dispatch`.
core_dispatch = importlib.import_module("recoil.pipeline.core.dispatch")  # noqa: E402
from recoil.pipeline._lib import board_builder as bb  # noqa: E402
from recoil.pipeline._lib.plan_loader import CanonicalShot, CharacterEntry  # noqa: E402
from recoil.pipeline.core.persistence import (  # noqa: E402
    SceneVersionConflictError,
    load_scene_active_with_version,
    save_scene,
    scene_path,
)
from recoil.pipeline.core.receipts import GenerationReceipt  # noqa: E402
from recoil.pipeline.core.registry import RunResult  # noqa: E402
from recoil.pipeline.core.scene_version_store import SceneVersionStore  # noqa: E402
from recoil.pipeline.core.take import Beat, Scene, Take  # noqa: E402
import json  # noqa: E402

from recoil.pipeline._lib import ops_log  # noqa: E402
from recoil.pipeline.core.workflow import Workflow, WorkflowStep  # noqa: E402
from recoil.workspace import readmodel as rm  # noqa: E402
from recoil.workspace.tests.test_readmodel_board import _make_project  # noqa: E402


def test_atom_regen_imports_and_approve_is_set_canon():
    """The write-side surface imports cleanly and ``approve`` aliases ``set_canon``."""
    assert approve is set_canon
    assert callable(regenerate_atom)
    assert callable(set_canon)


# ──────────────────────────────────────────────────────────────────────────────
# Phase 3 milestone fixtures — REAL persisted data shape + an INJECTED fake gen.
# ──────────────────────────────────────────────────────────────────────────────

_RED = (255, 0, 0)
_BLUE = (0, 0, 255)
_GREEN = (0, 255, 0)
_CYAN = (0, 255, 255)

_BATCH = "BATCH_001"
_SH03_URN = "atom:EP001/beat/EP001_SH03"


def _shot(shot_id: str, location_id: str = "loc_bridge", char: str = "JADE") -> dict:
    """The EpisodeRunner shot shape: ``dataclasses.asdict(CanonicalShot(...))`` (the REAL
    persisted shape, NOT flat keys) so facet/artifact resolution is non-vacuous."""
    return dataclasses.asdict(
        CanonicalShot(
            shot_id=shot_id,
            scene_index=0,
            sequence_id=_BATCH,
            pipeline="video",
            previs_model=None,
            video_model=None,
            location_id=location_id,
            characters=[CharacterEntry(char_id=char, wardrobe_phase_id="jade_phase_1")],
            shot_type="WIDE",
            duration_s=None,
            is_env_only=False,
            has_dialogue=False,
            aspect_ratio=None,
            raw={
                "prompt_data": {"kinetic_action": "grips the railing", "shot_type": "WIDE"},
                "asset_data": {
                    "props": ["railing"],
                    "prop_interaction": "grips the railing",
                    "location_id": location_id,
                    "characters": [{"char_id": char, "wardrobe_phase_id": "jade_phase_1"}],
                },
            },
        )
    )


def _receipt(output_path: str) -> GenerationReceipt:
    """Mirror test_composition_render._receipt — the shape ``_terminal_artifact`` reads."""
    rr = RunResult(id="rr", modality="video_i2v", output_path=output_path, success=True)
    return GenerationReceipt(
        receipt_id="rcpt",
        modality="video_i2v",
        caller_id="test",
        project=None,
        episode=None,
        shot_id=None,
        timestamp_utc="2026-06-23T00:00:00Z",
        run_result=rr,
    )


def _write_panel(project_root: Path, rel: str, color: tuple[int, int, int]) -> None:
    """Write a real small solid-color PNG at the project-relative artifact path."""
    abs_path = project_root / rel
    abs_path.parent.mkdir(parents=True, exist_ok=True)
    Image.new("RGB", (24, 24), color).save(abs_path)


def _take(beat_id: str, idx: int, output_path: str) -> Take:
    """A Take whose terminal video_i2v step already carries a receipt output."""
    step = WorkflowStep(step_id="video", modality="video_i2v", payload={})
    step.receipt = _receipt(output_path)
    step.status = "succeeded"
    wf = Workflow(workflow_id=f"{beat_id}_wf{idx}", steps=[step])
    return Take(
        take_id=f"{beat_id}_take_{idx}",
        take_index=idx,
        workflow=wf,
        status="succeeded",
    )


def _canon_beat(beat_id: str, output_path: str) -> Beat:
    """One beat with a single initial take (idx 0) carrying ``output_path``; canon = take 0."""
    take = _take(beat_id, 0, output_path)
    return Beat(
        beat_id=beat_id,
        takes=[take],
        primary_take_id=take.take_id,
        beat_metadata={"shot": _shot(beat_id), "scene_id": _BATCH},
    )


def _persist_three_beats(tmp_path, monkeypatch) -> tuple[str, Path]:
    """Persist BATCH_001 with THREE real-artifact beats — SH02 RED, SH03 BLUE (reshoot
    target), SH04 GREEN — each beat's canon being its own take 0. The scene is a flat
    body (``save_scene``), so the active version is v1. Returns ``(project, project_root)``.
    """
    project, project_root, _scenes, _storyboards = _make_project(tmp_path, monkeypatch)
    _write_panel(project_root, "renders/ep_001/EP001_SH02/t0.png", _RED)
    _write_panel(project_root, "renders/ep_001/EP001_SH03/t0.png", _BLUE)
    _write_panel(project_root, "renders/ep_001/EP001_SH04/t0.png", _GREEN)
    scene = Scene(
        scene_id=_BATCH,
        beats=[
            _canon_beat("EP001_SH02", "renders/ep_001/EP001_SH02/t0.png"),
            _canon_beat("EP001_SH03", "renders/ep_001/EP001_SH03/t0.png"),
            _canon_beat("EP001_SH04", "renders/ep_001/EP001_SH04/t0.png"),
        ],
    )
    save_scene(scene, scene_path(project, 1, _BATCH))
    return project, project_root


# ── Injected generators (execution contract (a): already-receipted Workflows) ───


def _receipted_workflow(output_path: str) -> Workflow:
    """An ALREADY-EXECUTED video_i2v Workflow: its terminal step ALREADY carries a
    receipt whose ``run_result.output_path`` is ``output_path``. ``regenerate_atom`` only
    appends + persists it — it NEVER executes anything (the fake stands in for production's
    internal execution)."""
    step = WorkflowStep(step_id="video", modality="video_i2v", payload={})
    step.receipt = _receipt(output_path)
    step.status = "succeeded"
    return Workflow(workflow_id=f"regen_wf_{uuid.uuid4().hex}", steps=[step])


def _make_fake_generate(project_root: Path, color: tuple[int, int, int] = _CYAN):
    """Build the INDEX-AGNOSTIC fake generator ``(beat) -> Workflow``.

    Writes a NEW real panel PNG under the project root at a content-addressed name
    (uuid4 hex — NOT keyed to a guessed take_index; the resolver reads the receipt's
    ``output_path``, never the filename's index) and returns an already-receipted
    Workflow pointing at it. This IS the injected artifact — it NEVER calls
    VideoRunner/dispatch.
    """
    def fake_generate(beat: Beat) -> Workflow:
        rel = f"renders/ep_001/{beat.beat_id}/regen_{uuid.uuid4().hex}.png"
        _write_panel(project_root, rel, color)
        return _receipted_workflow(rel)

    return fake_generate


def boom_generate(*a, **k) -> Workflow:
    """A poison generator — a real-generation attempt would surface here."""
    raise AssertionError("regenerate must not call real generation")


def unreceipted_generate(beat: Beat) -> Workflow:
    """A workflow whose video_i2v step is UNEXECUTED (``receipt is None``, status left at
    its pending default) — an execution-contract-(a) violation: no receipted output_path."""
    step = WorkflowStep(step_id="video", modality="video_i2v", payload={})
    return Workflow(workflow_id=f"unreceipted_{uuid.uuid4().hex}", steps=[step])


def missing_artifact_generate(beat: Beat) -> Workflow:
    """Receipted, truthy ``output_path``, but NO file on disk — the cut render would
    ``BoardBuilderError`` on it (Finding 2, the artifact-existence half of the guard)."""
    return _receipted_workflow("renders/ep_001/EP001_SH03/never_written.png")


def unsafe_artifact_generate(beat: Beat) -> Workflow:
    """Receipted, but ``output_path`` ESCAPES the project root ('..') — exactly what
    ``readmodel._safe_artifact_relpath`` returns ``None`` for (Finding 2, out-of-root half)."""
    return _receipted_workflow("renders/../../escape.png")


# ── Re-load + cell-sampling helpers ─────────────────────────────────────────────


def _reload_beat(project: str, beat_id: str) -> Beat:
    """Re-read the ACTIVE scene from disk and return the beat with ``beat_id``."""
    scene, _version = load_scene_active_with_version(project, 1, _BATCH)
    return next(b for b in scene.beats if b.beat_id == beat_id)


def _cell_geometry(slots: int) -> tuple[int, int, int, int]:
    """(width, height, cell_w, cell_h) from ``bb.GRID_LAYOUTS[slots]`` — derived, not magic."""
    size_override, cols, rows = bb.GRID_LAYOUTS[slots]
    width, height = (int(x) for x in size_override.split("x"))
    return width, height, width // cols, height // rows


def _sample_cell(project_root: Path, rel_path: str, index: int, slots: int = 4) -> tuple:
    """Center pixel of the row-major cell ``index`` in a rendered composition PNG.

    cell-0 = top-left member, cell-1 = top-right — the ``test_composition_render``
    sampling idiom, generalized via ``divmod(index, cols)`` to reach lower-row cells.
    """
    _w, _h, cell_w, cell_h = _cell_geometry(slots)
    cols = bb.GRID_LAYOUTS[slots][1]
    row, col = divmod(index, cols)
    img = Image.open(project_root / rel_path).convert("RGB")
    img.load()
    return img.getpixel((col * cell_w + cell_w // 2, row * cell_h + cell_h // 2))


# ──────────────────────────────────────────────────────────────────────────────
# Case A — append-only new take; neighbors untouched; canon NOT moved.
# ──────────────────────────────────────────────────────────────────────────────


def test_regenerate_appends_new_take_append_only(tmp_path, monkeypatch):
    project, project_root = _persist_three_beats(tmp_path, monkeypatch)
    fake_generate = _make_fake_generate(project_root)

    # 1. Capture target take 0 + the two neighbors' canon artifact BEFORE regen.
    take0_before = _reload_beat(project, "EP001_SH03").takes[0].to_dict()
    sh02_art_before = rm.resolve_atom_version("atom:EP001/beat/EP001_SH02@t0", project).artifact
    sh04_art_before = rm.resolve_atom_version("atom:EP001/beat/EP001_SH04@t0", project).artifact

    # 2-3. Reshoot SH03 → an APPENDED @t1.
    urn = regenerate_atom(project, _SH03_URN, fake_generate)
    assert urn == "atom:EP001/beat/EP001_SH03@t1"

    # 4. SH03 now has 2 takes; take 0 is BYTE-IDENTICAL; canon STILL points at take 0
    #    (regenerate does NOT approve).
    sh03 = _reload_beat(project, "EP001_SH03")
    assert [t.take_index for t in sh03.takes] == [0, 1]
    assert sh03.takes[0].to_dict() == take0_before
    assert sh03.primary_take_id == "EP001_SH03_take_0"

    # 5. Neighbors unchanged: still exactly 1 take, same canon pointer + resolved artifact.
    for nbr, art_before in (("EP001_SH02", sh02_art_before), ("EP001_SH04", sh04_art_before)):
        beat = _reload_beat(project, nbr)
        assert len(beat.takes) == 1
        assert beat.primary_take_id == f"{nbr}_take_0"
        assert rm.resolve_atom_version(f"atom:EP001/beat/{nbr}@t0", project).artifact == art_before


# ──────────────────────────────────────────────────────────────────────────────
# Case B — the injected generator is the SOLE generation path (runtime poison).
# ──────────────────────────────────────────────────────────────────────────────


def test_regenerate_injection_is_the_only_generation_path(tmp_path, monkeypatch):
    project, project_root = _persist_three_beats(tmp_path, monkeypatch)
    fake_generate = _make_fake_generate(project_root)

    def _boom(*a, **k):
        raise AssertionError("regenerate must NEVER call a real generation entrypoint")

    # Poison the canonical generation entrypoint (the only one atom_regen could reach;
    # it imports no other dispatch/runner surface). Succeeding while dispatch is poisoned
    # PROVES the generation is injected (zero real generation).
    monkeypatch.setattr(core_dispatch, "dispatch", _boom)
    urn = regenerate_atom(project, _SH03_URN, fake_generate)
    assert urn == "atom:EP001/beat/EP001_SH03@t1"

    # A generator that RAISES surfaces — the injected callable is the SOLE generation
    # path (no fallback real-gen masks it).
    with pytest.raises(AssertionError):
        regenerate_atom(project, _SH03_URN, boom_generate)


# ──────────────────────────────────────────────────────────────────────────────
# Case B2 — an UNRECEIPTED workflow is rejected; beat fully unchanged (contract (a)).
# ──────────────────────────────────────────────────────────────────────────────


def test_regenerate_rejects_unreceipted_workflow_beat_unchanged(tmp_path, monkeypatch):
    project, _project_root = _persist_three_beats(tmp_path, monkeypatch)

    before = _reload_beat(project, "EP001_SH03")
    count_before = len(before.takes)
    take0_before = before.takes[0].to_dict()
    primary_before = before.primary_take_id

    # No step carries a receipt with a truthy run_result.output_path → contract-(a)
    # violation, caught BEFORE Beat.new_take.
    with pytest.raises(ValueError):
        regenerate_atom(project, _SH03_URN, unreceipted_generate)

    # The rejected unreceipted workflow left the beat fully untouched.
    after = _reload_beat(project, "EP001_SH03")
    assert len(after.takes) == count_before
    assert after.takes[0].to_dict() == take0_before
    assert after.primary_take_id == primary_before


# ──────────────────────────────────────────────────────────────────────────────
# Case B3 — a MISSING / UNSAFE receipted artifact is rejected; beat unchanged.
# ──────────────────────────────────────────────────────────────────────────────


@pytest.mark.parametrize(
    "gen",
    [missing_artifact_generate, unsafe_artifact_generate],
    ids=["missing_artifact", "unsafe_artifact"],
)
def test_regenerate_rejects_missing_or_unsafe_artifact_beat_unchanged(
    tmp_path, monkeypatch, gen
):
    project, _project_root = _persist_three_beats(tmp_path, monkeypatch)

    before = _reload_beat(project, "EP001_SH03")
    count_before = len(before.takes)
    take0_before = before.takes[0].to_dict()
    primary_before = before.primary_take_id

    # A receipted-but-UNRENDERABLE artifact (missing on disk, or escaping the project
    # root → _safe_artifact_relpath None) is rejected at APPEND time — validating the
    # EXACT artifact the read model exposes — NOT deferred to a later render BoardBuilderError.
    with pytest.raises(ValueError):
        regenerate_atom(project, _SH03_URN, gen)

    after = _reload_beat(project, "EP001_SH03")
    assert len(after.takes) == count_before
    assert after.takes[0].to_dict() == take0_before
    assert after.primary_take_id == primary_before


# ──────────────────────────────────────────────────────────────────────────────
# Case C (THE MILESTONE) — reshoot, neighbors untouched, swap canon, re-render cut.
# ──────────────────────────────────────────────────────────────────────────────


def test_milestone_reshoot_swap_canon_rerender_cut(tmp_path, monkeypatch):
    project, project_root = _persist_three_beats(tmp_path, monkeypatch)
    fake_generate = _make_fake_generate(project_root)

    # 1. Poison generation for the WHOLE case — the milestone completing while these are
    #    poisoned IS the zero-generation proof (mirrors
    #    test_composition_render.test_reorder_re_render_fires_zero_generation).
    def _boom(*a, **k):
        raise AssertionError("the milestone must fire ZERO generation")

    monkeypatch.setattr(bb, "dispatch", _boom)
    monkeypatch.setattr(bb, "build_and_dispatch_board", _boom)
    monkeypatch.setattr(bb, "render_board_finish", _boom)
    monkeypatch.setattr(core_dispatch, "dispatch", _boom)

    canon_urns = [
        "atom:EP001/beat/EP001_SH02@t0",
        "atom:EP001/beat/EP001_SH03@t0",
        "atom:EP001/beat/EP001_SH04@t0",
    ]

    # 2. BEFORE cut over the three CANON urns → SH03 cell (index 1) is BLUE.
    cut = rm.build_cut_composition(canon_urns, layout={"slots": 4})
    rel_before = bb.render_composition_view(
        cut, project, out_path="prep/ep_001/storyboards/cut_before.png"
    )
    assert _sample_cell(project_root, rel_before, 1) == _BLUE

    # Capture neighbor artifacts + on-disk panel bytes BEFORE the reshoot.
    sh02_art_before = rm.resolve_atom_version("atom:EP001/beat/EP001_SH02@t0", project).artifact
    sh04_art_before = rm.resolve_atom_version("atom:EP001/beat/EP001_SH04@t0", project).artifact
    red_bytes = (project_root / "renders/ep_001/EP001_SH02/t0.png").read_bytes()
    green_bytes = (project_root / "renders/ep_001/EP001_SH04/t0.png").read_bytes()

    # 3. Reshoot ONE beat → an appended @t1 (CYAN).
    new_urn = regenerate_atom(project, _SH03_URN, fake_generate)
    assert new_urn == "atom:EP001/beat/EP001_SH03@t1"

    # 4. Neighbors untouched (artifacts + on-disk bytes), and re-rendering the BEFORE cut
    #    STILL yields BLUE at SH03 (canon hasn't moved yet).
    assert rm.resolve_atom_version("atom:EP001/beat/EP001_SH02@t0", project).artifact == sh02_art_before
    assert rm.resolve_atom_version("atom:EP001/beat/EP001_SH04@t0", project).artifact == sh04_art_before
    assert (project_root / "renders/ep_001/EP001_SH02/t0.png").read_bytes() == red_bytes
    assert (project_root / "renders/ep_001/EP001_SH04/t0.png").read_bytes() == green_bytes
    rel_still = bb.render_composition_view(
        cut, project, out_path="prep/ep_001/storyboards/cut_before_again.png"
    )
    assert _sample_cell(project_root, rel_still, 1) == _BLUE

    # 5. Swap canon (approve) → the pointer moves to @t1.
    set_canon(project, _SH03_URN, new_urn)
    assert rm.get_atom(_SH03_URN, project).canon_urn == new_urn

    # 6. Re-render the cut FOLLOWING the canon pointer.
    cut_after = rm.build_cut_composition(
        [
            rm.get_atom(f"atom:EP001/beat/{b}", project).canon_urn
            for b in ("EP001_SH02", "EP001_SH03", "EP001_SH04")
        ],
        layout={"slots": 4},
    )
    rel_after = bb.render_composition_view(
        cut_after, project, out_path="prep/ep_001/storyboards/cut_after.png"
    )

    # 7. The cut reflects the NEW take (SH03 cell now CYAN, not BLUE); neighbors identical.
    assert _sample_cell(project_root, rel_after, 1) == _CYAN
    assert _sample_cell(project_root, rel_after, 0) == _RED
    assert _sample_cell(project_root, rel_after, 2) == _GREEN


# ──────────────────────────────────────────────────────────────────────────────
# Case D — two SEQUENTIAL regens get DISTINCT indices (@t1 then @t2, no clobber).
# ──────────────────────────────────────────────────────────────────────────────


def test_two_sequential_regens_distinct_indices_no_clobber(tmp_path, monkeypatch):
    project, project_root = _persist_three_beats(tmp_path, monkeypatch)
    fake_generate = _make_fake_generate(project_root)

    # 1. Capture take 0 BEFORE any regen.
    take0_before = _reload_beat(project, "EP001_SH03").takes[0].to_dict()

    # 2. First regen → @t1; capture its body.
    urn1 = regenerate_atom(project, _SH03_URN, fake_generate)
    assert urn1 == "atom:EP001/beat/EP001_SH03@t1"
    take1_after_first = _reload_beat(project, "EP001_SH03").takes[1].to_dict()

    # 3. Second (back-to-back, single-threaded) regen → @t2: a FRESH index, NOT a
    #    re-assigned 1. The two URNs are DISTINCT (each new_take assigned a fresh index
    #    against lock-fresh state).
    urn2 = regenerate_atom(project, _SH03_URN, fake_generate)
    assert urn2 == "atom:EP001/beat/EP001_SH03@t2"
    assert urn1 != urn2

    # 4. Exactly 3 takes (idx 0/1/2); take 0 + take 1 BYTE-INTACT (neither overwritten);
    #    take 1 distinct from take 2; canon STILL take 0 (no regen approved).
    sh03 = _reload_beat(project, "EP001_SH03")
    assert [t.take_index for t in sh03.takes] == [0, 1, 2]
    assert sh03.takes[0].to_dict() == take0_before
    assert sh03.takes[1].to_dict() == take1_after_first
    assert sh03.takes[1].to_dict() != sh03.takes[2].to_dict()
    assert sh03.primary_take_id == "EP001_SH03_take_0"


# ──────────────────────────────────────────────────────────────────────────────
# Case E — STATIC zero-gen gate (import/AST), defense-in-depth beyond Case B.
# ──────────────────────────────────────────────────────────────────────────────


def test_atom_regen_imports_no_dispatch_or_videorunner():
    """Deterministic static assertion on the SOURCE of atom_regen.py (does NOT execute
    it): closes the gap Case B's runtime poison misses — a module that captured a DIRECT
    import of dispatch/VideoRunner could still call it without going through
    recoil.pipeline.core.dispatch.dispatch.
    """
    source = Path(atom_regen.__file__).read_text(encoding="utf-8")
    tree = ast.parse(source)

    forbidden_names = {"dispatch", "VideoRunner", "build_and_dispatch_board"}
    forbidden_modules = {
        "recoil.pipeline.core.dispatch",
        "recoil.pipeline.core.runners.video_runner",
    }

    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                bound = alias.asname or alias.name
                last = alias.name.split(".")[-1]
                assert alias.name not in forbidden_modules, (
                    f"atom_regen.py imports the forbidden module {alias.name!r} "
                    f"(line {node.lineno})"
                )
                assert bound not in forbidden_names and last not in forbidden_names, (
                    f"atom_regen.py binds a forbidden name via `import {alias.name}` "
                    f"(line {node.lineno})"
                )
        elif isinstance(node, ast.ImportFrom):
            # TOLERATE `from recoil.workspace.readmodel import ...` — the receipt guard
            # reads the artifact via the read model's OWN helpers (the locators +
            # _safe_artifact_relpath). readmodel is NOT a forbidden module.
            assert node.module not in forbidden_modules, (
                f"atom_regen.py imports from the forbidden module {node.module!r} "
                f"(line {node.lineno})"
            )
            for alias in node.names:
                bound = alias.asname or alias.name
                assert alias.name not in forbidden_names and bound not in forbidden_names, (
                    f"atom_regen.py imports the forbidden name {alias.name!r} from "
                    f"{node.module!r} (line {node.lineno})"
                )

    # Raw-source substring check, scoped to the EXACT generation-execution tokens. These
    # do NOT false-positive on run_result / output_path / _terminal_artifact /
    # _safe_artifact_relpath (none of which contain '.run(' / '.execute(' / 'dispatch(').
    for token in (".run(", ".execute(", "dispatch("):
        assert token not in source, (
            f"atom_regen.py source contains a forbidden generation-execution token "
            f"{token!r} — it must compose append+persist only "
            f"(Beat.new_take / save_active_scene_status are its only mutators)"
        )


# ══════════════════════════════════════════════════════════════════════════════
# REC-241 SAFE SCOPE — orphan / version-conflict policy + concurrency safety.
#
# regenerate_atom runs the (injected, already-receipted) generate(beat) BEFORE its
# version-checked locked append. If the active version moves between the internal
# load and the save, save_active_scene_status raises SceneVersionConflictError and
# writes NOTHING — leaving the SPENT artifact generate() produced ORPHANED.
# Contract (c): RECORD the orphan through the established ops-log seam BEFORE
# re-raising. These cases use the SAME fake-generate / receipted-fixture style as
# above — NO real generation, NO spend.
# ══════════════════════════════════════════════════════════════════════════════


def _persist_three_beats_versioned(tmp_path, monkeypatch) -> tuple[str, Path]:
    """Like ``_persist_three_beats`` but the batch is VERSIONED: a flat v1 (active) plus
    a same-structure v2 CANDIDATE (pointer left at v1). A later ``store.conform(_, 2)``
    moves the active pointer — the REAL conflict mechanism (mirrors
    ``test_pointer_routing.test_status_save_rejects_stale_version_after_conform``)."""
    project, project_root = _persist_three_beats(tmp_path, monkeypatch)
    # v2 candidate = same three beats, byte-fresh takes, IDENTICAL structure so the
    # conform is structurally valid (fingerprints coincide).
    v2 = Scene(
        scene_id=_BATCH,
        beats=[
            _canon_beat("EP001_SH02", "renders/ep_001/EP001_SH02/t0.png"),
            _canon_beat("EP001_SH03", "renders/ep_001/EP001_SH03/t0.png"),
            _canon_beat("EP001_SH04", "renders/ep_001/EP001_SH04/t0.png"),
        ],
    )
    store = SceneVersionStore(project, 1)
    store.write_scene_candidate(_BATCH, v2)  # active stays at v1
    return project, project_root


def _reload_beat_v1(project: str, beat_id: str) -> Beat:
    """Read the V1 body DIRECTLY (the flat file IS v1's body — materialized in place),
    not the active pointer. After a conform-to-v2 the active read would resolve v2, so a
    'no phantom take' assertion must target the SPECIFIC version the caller wrote into."""
    from recoil.pipeline.core.persistence import load_scene, scene_path as _sp

    scene = load_scene(_sp(project, 1, _BATCH))
    return next(b for b in scene.beats if b.beat_id == beat_id)


def _read_orphan_records(ops_log_path: Path) -> list[dict]:
    """Parse the orphan ``regen_orphan_artifact`` events from the ops log JSONL."""
    if not ops_log_path.exists():
        return []
    records = []
    for line in ops_log_path.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line:
            continue
        rec = json.loads(line)
        if rec.get("event") == "regen_orphan_artifact":
            records.append(rec)
    return records


def _conforming_generate(project_root: Path, project: str):
    """A fake generate that — AFTER producing its (real, receipted) artifact — conforms
    the active pointer to v2, so the post-generation save observes the moved version.

    This faithfully models the TOCTOU window: generate() has ALREADY spent the artifact
    by the time it returns, and the active version moved before this caller's save.
    """
    base = _make_fake_generate(project_root)

    def gen(beat: Beat) -> Workflow:
        wf = base(beat)  # spend the artifact FIRST (contract (a): already receipted)
        SceneVersionStore(project, 1).conform(_BATCH, 2)  # pointer moves mid-flight
        return wf

    return gen


# ──────────────────────────────────────────────────────────────────────────────
# Case F — post-generation version conflict → orphan recorded + raised + no phantom.
# ──────────────────────────────────────────────────────────────────────────────


def test_post_generation_conflict_records_orphan_and_reraises(tmp_path, monkeypatch):
    project, project_root = _persist_three_beats_versioned(tmp_path, monkeypatch)
    ops_log_path = tmp_path / "ops.log.jsonl"
    monkeypatch.setenv("RECOIL_OPS_LOG_PATH", str(ops_log_path))

    before = _reload_beat(project, "EP001_SH03")
    count_before = len(before.takes)
    take0_before = before.takes[0].to_dict()
    primary_before = before.primary_take_id

    # generate() spends the artifact, THEN the active version moves to v2 → the
    # expected_version=1 save is refused.
    with pytest.raises(SceneVersionConflictError):
        regenerate_atom(project, _SH03_URN, _conforming_generate(project_root, project))

    # 1. The spent artifact was RECORDED as an orphan (THROUGH the ops-log seam), with
    #    the receipted artifact path + beat_id + the version-conflict cause.
    orphans = _read_orphan_records(ops_log_path)
    assert len(orphans) == 1, "exactly one orphan recorded for the spent artifact"
    orphan = orphans[0]
    assert orphan["beat_id"] == "EP001_SH03"
    assert orphan["cause"] == "scene_version_conflict"
    assert orphan["batch_id"] == _BATCH
    assert orphan["expected_version"] == 1
    assert orphan["actual_version"] == 2
    # The recorded artifact path is the real, on-disk, project-relative spent panel.
    assert orphan["artifact"].startswith("renders/ep_001/EP001_SH03/regen_")
    assert (project_root / orphan["artifact"]).exists()

    # 2. The v1 scene history (the body this caller targeted) is UNCHANGED — no phantom
    #    take appended (read v1 DIRECTLY; the conform moved the active pointer to v2).
    after = _reload_beat_v1(project, "EP001_SH03")
    assert len(after.takes) == count_before
    assert after.takes[0].to_dict() == take0_before
    assert after.primary_take_id == primary_before


# ──────────────────────────────────────────────────────────────────────────────
# Case F2 — NON-VACUOUSNESS: neuter the orphan-record call → the orphan assertion fails.
# ──────────────────────────────────────────────────────────────────────────────


def test_orphan_record_is_load_bearing_non_vacuous(tmp_path, monkeypatch):
    project, project_root = _persist_three_beats_versioned(tmp_path, monkeypatch)
    ops_log_path = tmp_path / "ops.log.jsonl"
    monkeypatch.setenv("RECOIL_OPS_LOG_PATH", str(ops_log_path))

    # Neuter the SOLE orphan-record path (ops_log.write as bound in atom_regen).
    monkeypatch.setattr(atom_regen.ops_log, "write", lambda *a, **k: None)

    # The conflict STILL raises (fail-loud is independent of the record)...
    with pytest.raises(SceneVersionConflictError):
        regenerate_atom(project, _SH03_URN, _conforming_generate(project_root, project))

    # ...but with the record neutered, NO orphan is captured — proving the assertion in
    # Case F is load-bearing (the orphan record is what makes the spent artifact visible).
    assert _read_orphan_records(ops_log_path) == []


# ──────────────────────────────────────────────────────────────────────────────
# Case G (CONCURRENCY) — two stale callers at the same loaded_version: one append wins,
# the loser's save raises SceneVersionConflictError AND its orphan is recorded.
# ──────────────────────────────────────────────────────────────────────────────


def test_two_stale_callers_one_wins_loser_orphan_recorded(tmp_path, monkeypatch):
    project, project_root = _persist_three_beats_versioned(tmp_path, monkeypatch)
    ops_log_path = tmp_path / "ops.log.jsonl"
    monkeypatch.setenv("RECOIL_OPS_LOG_PATH", str(ops_log_path))

    # Both callers load active v1 and BOTH reach generate(beat) (each spends an artifact)
    # before either save. The per-batch lock + expected_version check serialize the SAVE:
    #   WINNER  — plain fake generate; appends @t1 to v1; succeeds.
    #   LOSER   — generate spends its artifact, THEN conforms active to v2; its
    #             expected_version=1 save is refused → SceneVersionConflictError + orphan.
    # Deterministic interleave: run the winner FIRST (so v1 is still active when it saves),
    # then the loser (whose own generate moves the pointer between its load and its save).
    winner_generate = _make_fake_generate(project_root)
    loser_generate = _conforming_generate(project_root, project)

    win_urn = regenerate_atom(project, _SH03_URN, winner_generate)
    assert win_urn == "atom:EP001/beat/EP001_SH03@t1", "exactly one append won (@t1 on v1)"

    with pytest.raises(SceneVersionConflictError):
        regenerate_atom(project, _SH03_URN, loser_generate)

    # Exactly one orphan recorded — the LOSER's spent artifact (never silently lost).
    orphans = _read_orphan_records(ops_log_path)
    assert len(orphans) == 1
    assert orphans[0]["beat_id"] == "EP001_SH03"
    assert orphans[0]["cause"] == "scene_version_conflict"
    assert (project_root / orphans[0]["artifact"]).exists()

    # The winner's append is intact on v1: exactly one new take landed (@t0 + @t1), the
    # loser added NO phantom take. Read v1 DIRECTLY (the loser conformed active to v2).
    sh03 = _reload_beat_v1(project, "EP001_SH03")
    assert [t.take_index for t in sh03.takes] == [0, 1]
    assert sh03.primary_take_id == "EP001_SH03_take_0"
