"""REC-236: EpisodeRunner pre-submit ref-integrity preflight.

A dispatch whose RESOLVED refs include a broken one must raise
RefIntegrityError BEFORE the paid submit (take.execute) — proving no money is
spent against a stub/text-file ref. A dispatch with all-valid refs must reach
submit. The submit boundary (Take.execute) is poisoned so any reach is loud.

Scope is THIS beat's resolved refs (payload start_frame / reference_images),
not a whole-tree scan.
"""
from __future__ import annotations

import asyncio
from pathlib import Path
from unittest.mock import MagicMock

import pytest

from recoil.pipeline.core.take import Beat, Scene, Take
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.pipeline.orchestrator import episode_runner as er
from recoil.pipeline.orchestrator.episode_runner import (
    EpisodeRunner,
    RefIntegrityError,
)

_PNG_MAGIC = b"\x89PNG\r\n\x1a\n"


@pytest.fixture(autouse=True)
def _project_root(tmp_path, monkeypatch):
    root = tmp_path / "projects"
    root.mkdir()
    (root / ".recoil-data-root").touch()
    (root / "fixture").mkdir()
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(root))
    monkeypatch.delenv("RECOIL_BOARD_GATE", raising=False)
    monkeypatch.setattr(er, "load_project_config", lambda _project: {})
    return root


def _valid_png(path: Path) -> Path:
    path.write_bytes(_PNG_MAGIC + b"\x00" * 2048)
    return path


def _scene() -> Scene:
    beat = Beat(
        beat_id="BATCH_004",
        max_takes=1,
        beat_metadata={"modality": "video_i2v", "scene_id": "BATCH_004"},
    )
    return Scene(
        scene_id="BATCH_004",
        beats=[beat],
        scene_metadata={"episode": "ep_001", "project": "fixture"},
    )


def _workflow_with_refs(beat_id: str, refs: list[str]) -> Workflow:
    return Workflow(
        workflow_id=f"{beat_id}_take_0",
        steps=[WorkflowStep(
            step_id="video",
            modality="video_i2v",
            payload={"shot_id": beat_id, "reference_images": list(refs)},
        )],
        global_provenance={"shot_id": beat_id, "episode": "ep_001"},
    )


def _runner(monkeypatch, *, refs: list[str], gate_enabled: bool = True):
    """EpisodeRunner whose _build_workflow_for_beat yields a controlled-ref
    workflow, with the paid submit boundary poisoned.

    gate_enabled=False neuters the preflight (for the non-vacuousness proof).
    """
    runner = EpisodeRunner(
        project="fixture", plan={}, episode="ep_001", step_runner=MagicMock()
    )

    def _build(beat, take_index, *_a, **_kw):
        return _workflow_with_refs(beat.beat_id, refs)

    monkeypatch.setattr(runner, "_build_workflow_for_beat", _build)
    # No budget gate fires: estimate is 0 (no shot metadata), spent 0 < budget.

    submitted: dict[str, bool] = {"reached": False}

    def _poison_execute(self, *_a, **_kw):
        # Flag the reach. We do NOT raise here: _dispatch_one_beat wraps
        # take.execute in a blind-retry `except Exception`, which would swallow
        # the signal. The `submitted` flag is the authoritative "submit reached"
        # signal, asserted by the caller.
        submitted["reached"] = True

    monkeypatch.setattr(Take, "execute", _poison_execute)

    if not gate_enabled:
        # Neuter the gate: make check_paths see nothing broken. Used ONLY by the
        # non-vacuousness proof to confirm the broken-ref test fails without the gate.
        monkeypatch.setattr(er, "check_paths", lambda _paths: [])

    return runner, submitted


def test_broken_ref_blocks_before_submit(tmp_path, monkeypatch):
    broken = tmp_path / "stub.png"
    broken.write_bytes(b"tiny")  # < 1024, no magic
    runner, submitted = _runner(monkeypatch, refs=[str(broken)])

    with pytest.raises(RefIntegrityError) as exc:
        asyncio.run(runner.run_scene(_scene()))

    assert not submitted["reached"], "submit must NOT be reached on a broken ref"
    assert str(broken) in str(exc.value)
    assert "git lfs pull" not in str(exc.value)  # non-LFS remediation


def test_broken_ref_leaves_no_phantom_take(tmp_path, monkeypatch):
    """REC-236 cleanup (codex finding): the preflight fails closed BEFORE a take
    is created/persisted, so a broken ref never leaves a phantom 'running' take
    (and the est release on the same path prevents leaked budget)."""
    broken = tmp_path / "stub.png"
    broken.write_bytes(b"tiny")
    runner, submitted = _runner(monkeypatch, refs=[str(broken)])
    scene = _scene()

    with pytest.raises(RefIntegrityError):
        asyncio.run(runner.run_scene(scene))

    assert not submitted["reached"]
    # No take was appended — the gate ran before beat.new_take(); nothing to
    # leave 'running' on disk.
    assert scene.beats[0].takes == [], "broken ref must not leave a phantom take"


def test_lfs_pointer_ref_blocks_with_lfs_remediation(tmp_path, monkeypatch):
    ptr = tmp_path / "pooled.png"
    ptr.write_bytes(
        b"version https://git-lfs.github.com/spec/v1\n"
        b"oid sha256:" + b"1" * 64 + b"\nsize 123\n"
    )
    runner, submitted = _runner(monkeypatch, refs=[str(ptr)])

    with pytest.raises(RefIntegrityError) as exc:
        asyncio.run(runner.run_scene(_scene()))

    assert not submitted["reached"]
    assert "git lfs pull" in str(exc.value)


def test_valid_refs_pass_preflight_and_reach_submit(tmp_path, monkeypatch):
    good = _valid_png(tmp_path / "good.png")
    runner, submitted = _runner(monkeypatch, refs=[str(good)])

    # No RefIntegrityError: valid refs pass the preflight and the run reaches
    # the (poisoned) submit boundary, flagging submitted["reached"].
    asyncio.run(runner.run_scene(_scene()))

    assert submitted["reached"], "valid refs must pass preflight and reach submit"


def test_non_vacuous_broken_ref_passes_when_gate_neutered(tmp_path, monkeypatch):
    """Non-vacuousness: with the gate neutered, the SAME broken ref reaches
    submit instead of raising — so test_broken_ref_blocks_before_submit only
    passes BECAUSE the gate is live.
    """
    broken = tmp_path / "stub.png"
    broken.write_bytes(b"tiny")
    runner, submitted = _runner(monkeypatch, refs=[str(broken)], gate_enabled=False)

    # With the gate neutered, no RefIntegrityError is raised and the SAME broken
    # ref reaches submit — proving the broken-ref test passes only via the gate.
    asyncio.run(runner.run_scene(_scene()))

    assert submitted["reached"]
