"""EpisodeRunner — mocked dispatch + budget exhaustion + Scene JSON write."""
import asyncio
import dataclasses
import json
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch

import pytest

from recoil.pipeline.orchestrator.episode_runner import (
    EpisodeRunner, BudgetExhaustedError,
)
from recoil.pipeline._lib.derivation_sha import shotset_hash
from recoil.pipeline._lib.grouping import GroupingContext, get_grouping
from recoil.pipeline._lib.plan_loader import CanonicalPlan, CanonicalShot
from recoil.pipeline._lib.scene_clusterer import Batch
from recoil.pipeline.core.runners.video_runner import VideoRunner
from recoil.pipeline.core.persistence import (
    SceneVersionConflictError,
    load_manifest,
    load_scene,
    load_scene_active_with_version,
    save_scene,
    scene_path,
    scene_version_path,
)
from recoil.pipeline.core.receipts import GenerationReceipt, utc_now_iso8601
from recoil.pipeline.core.registry import RunResult
from recoil.pipeline.core.scene_version_store import SceneVersionStore
from recoil.pipeline.core.take import Beat, Scene, Take
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.execution.step_runner import StepRunner
from recoil.execution.types import GenerationResult
from recoil.pipeline.orchestrator.coverage_planner import CoveragePass, CoverageSegment
from recoil.pipeline._lib import derivation_manifest


def _plan():
    return {
        "sequences": {
            "SEQ11": {"shots": [{"shot_id": "EP001_SH01"},
                                {"shot_id": "EP001_SH02"}]},
        }
    }


def _canonical_shot(shot_id: str, scene_index: int = 1) -> CanonicalShot:
    return CanonicalShot(
        shot_id=shot_id,
        scene_index=scene_index,
        sequence_id="SEQ11",
        pipeline="video",
        previs_model=None,
        video_model="seeddance-2.0",
        location_id="LOC_A",
        characters=[],
        shot_type="MS",
        duration_s=2.0,
        is_env_only=False,
        has_dialogue=False,
        aspect_ratio="9:16",
        quality=None,
        cinematography=None,
        raw={},
    )


def test_init_scene_pure(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
    )
    scene = runner.init_scene("SEQ11")
    assert scene.scene_id == "SEQ11"
    assert len(scene.beats) == 2


def test_dry_run_does_not_persist_scene_state(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001", concurrency=1,
    )
    scene = runner.init_scene("SEQ11")
    asyncio.run(runner.run_scene(scene, dry_run=True))
    p = scene_path("p", "ep_001", "SEQ11")
    # REC-100: dry-run is read-only — it must NOT persist the scene JSON or any
    # dry_run takes (a later live run would otherwise load phantom takes, demote
    # them to failed, and report the run failed without ever dispatching).
    assert not p.exists(), "dry-run must not persist scene state (REC-100)"


def test_recover_stale_takes(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
    )
    scene = runner.init_scene("SEQ11")
    # Inject a fake stale Take.
    from recoil.pipeline.core.workflow import Workflow, WorkflowStep
    wf = Workflow(workflow_id="w",
                  steps=[WorkflowStep(step_id="kf", modality="image_t2i",
                                      payload={"x": 1})])
    t = scene.beats[0].new_take(workflow=wf)
    t.status = "running"
    t.created_at = "2020-01-01T00:00:00+00:00"
    n = runner.recover_stale_takes(scene, staleness_minutes=5)
    assert n == 1
    assert t.status == "failed"


def test_run_episode_initializes_scenes_and_restamps_max_takes(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    stale_scene = Scene(
        scene_id="SEQ11",
        beats=[
            Beat(beat_id="EP001_SH01", max_takes=1),
            Beat(beat_id="EP001_SH02", max_takes=1),
        ],
        scene_metadata={"episode": "ep_001", "project": "p"},
    )
    path = scene_path("p", "ep_001", "SEQ11")
    save_scene(stale_scene, path)

    runner = EpisodeRunner(
        project="p",
        plan=_plan(),
        casting={},
        episode="ep_001",
        max_takes=5,
        concurrency=1,
    )
    seen_max_takes = []

    async def _capture_scene(scene, **kwargs):  # REC-231 Phase 4: tolerate expected_version
        seen_max_takes.append([beat.max_takes for beat in scene.beats])
        return scene

    monkeypatch.setattr(runner, "run_scene", _capture_scene)

    results = asyncio.run(runner.run_episode())

    assert seen_max_takes == [[5, 5]]
    assert [
        [beat.max_takes for beat in scene.beats] for scene in results
    ] == [[5, 5]]
    assert [beat.max_takes for beat in load_scene(path).beats] == [5, 5]


def test_budget_exhaustion_halts_scene(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=0.0, concurrency=1,    # immediately over cap
    )
    scene = runner.init_scene("SEQ11")
    # BudgetGuard.charge() with limit=0 hits ZeroDivisionError in its warn
    # threshold; mutate the internal counter directly to force the gate.
    runner.budget_guard._spent = 1.0
    with pytest.raises(BudgetExhaustedError):
        asyncio.run(runner.run_scene(scene))


def test_run_episode_batches_stamps_rendered_shot_on_budget_halt(
    tmp_path, monkeypatch
):
    """A BudgetExhaustedError raised from inside run_scene must propagate
    through the REAL run_episode_batches except (episode_runner.py:2565) and
    carry rendered_shot_ids stamped by the runner — NOT a test mock. Pins
    Phase 3a: monkeypatch the inner unit run_scene (the catch site's callee
    at 2556) so the live except executes."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    # MANDATORY: run_episode_batches calls _preflight_board_gate BEFORE
    # run_scene (2516/2548). With the board gate enabled it would raise a
    # board-gate error instead of reaching the budget path. Force it off.
    monkeypatch.setenv("RECOIL_BOARD_GATE", "0")
    from recoil.pipeline.core.workflow import Workflow, WorkflowStep

    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=50.0, concurrency=1,
    )
    # Two batches: two shots at DIFFERENT locations so the clusterer emits two
    # single-shot groups. Scene 1 renders; scene 2 halts.
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    shots[1] = dataclasses.replace(shots[1], location_id="LOC_B")
    canonical = CanonicalPlan(
        episode_id="EP001", project="p", shots=shots,
        source_path=Path("fixture_plan.json"),
    )

    # Capture the canonical shot id the runner assigns to the rendered beat
    # (from batch_summary.shot_ids — never hardcode; continuity stamps BATCH).
    rendered_shot_id_box: list[str] = []
    seen = []

    async def _run_scene(scene, *a, **k):
        seen.append(scene.scene_id)
        if len(seen) == 1:
            beat = scene.beats[0]
            rendered_shot_id_box.extend(
                beat.beat_metadata.get("batch_summary", {}).get(
                    "shot_ids", [beat.beat_id]
                )
            )
            beat.new_take(workflow=Workflow(
                workflow_id=f"{beat.beat_id}_take_0",
                steps=[WorkflowStep(step_id="video", modality="video_i2v",
                                    payload={"shot_id": beat.beat_id})],
            ))
            beat.takes[-1].status = "succeeded"
            return scene
        # Scene 2: budget exhausted before any take on this scene.
        raise BudgetExhaustedError(remaining=0.0)

    monkeypatch.setattr(runner, "run_scene", _run_scene)

    with pytest.raises(BudgetExhaustedError) as ei:
        asyncio.run(runner.run_episode_batches(canonical, dry_run=False))

    assert len(seen) == 2, (
        f"expected 2 batches/scenes, got {seen} — adjust the fixture so the "
        "clusterer emits two batches"
    )
    assert rendered_shot_id_box, "rendered beat had no batch_summary shot id"
    rendered_shot_id = rendered_shot_id_box[0]
    stamped = ei.value.rendered_shot_ids
    # Scene 1's beat rendered -> its shot id is stamped. Scene 2 raised before
    # any take -> nothing from it.
    assert rendered_shot_id in stamped
    assert stamped == [rendered_shot_id], (
        f"only the rendered beat's shot id must be stamped; got {stamped}"
    )


def test_budget_halt_partial_i2v_scene_stamps_only_rendered_beat(
    tmp_path, monkeypatch
):
    """A below-threshold i2v scene has one beat per shot. If a budget halt
    lands mid-scene (beat A rendered, beat B never dispatched), ONLY beat A's
    shot id may be stamped — never beat B's unrendered sibling. Pins the
    beat-level scope (the disk-reconcile failure class)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    monkeypatch.setenv("RECOIL_BOARD_GATE", "0")
    from recoil.pipeline.core.workflow import Workflow, WorkflowStep

    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=50.0, concurrency=1,
    )
    # TWO shots at the SAME location + adjacent scene_index so the clusterer
    # emits ONE below-threshold i2v batch = ONE scene with TWO beats.
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 1),
    ]
    canonical = CanonicalPlan(
        episode_id="EP001", project="p", shots=shots,
        source_path=Path("fixture_plan.json"),
    )

    rendered_box: list[str] = []
    sibling_box: list[str] = []

    async def _run_scene(scene, *a, **k):
        # Must be ONE multi-beat scene for this gate to be meaningful.
        assert len(scene.beats) >= 2, (
            f"expected one multi-beat i2v scene, got beats="
            f"{[b.beat_id for b in scene.beats]} — adjust the fixture so the "
            "two same-location shots cluster into one below-threshold batch"
        )
        rendered_beat, sibling_beat = scene.beats[0], scene.beats[1]
        rendered_box.extend(
            rendered_beat.beat_metadata.get("batch_summary", {}).get(
                "shot_ids", [rendered_beat.beat_id]
            )
        )
        sibling_box.extend(
            sibling_beat.beat_metadata.get("batch_summary", {}).get(
                "shot_ids", [sibling_beat.beat_id]
            )
        )
        # Beat A renders a real take.
        rendered_beat.new_take(workflow=Workflow(
            workflow_id=f"{rendered_beat.beat_id}_take_0",
            steps=[WorkflowStep(step_id="video", modality="video_i2v",
                                payload={"shot_id": rendered_beat.beat_id})],
        ))
        rendered_beat.takes[-1].status = "succeeded"
        # Beat B never dispatched (would have been cancelled by the halt).
        # Halt now — beat B gained NO take.
        raise BudgetExhaustedError(remaining=0.0)

    monkeypatch.setattr(runner, "run_scene", _run_scene)

    with pytest.raises(BudgetExhaustedError) as ei:
        asyncio.run(runner.run_episode_batches(canonical, dry_run=False))

    assert rendered_box and sibling_box
    stamped = ei.value.rendered_shot_ids
    assert rendered_box[0] in stamped, (
        f"rendered beat's shot id missing from stamp: {stamped}"
    )
    assert sibling_box[0] not in stamped, (
        f"UNRENDERED sibling beat's shot id leaked into rendered_shot_ids: "
        f"{stamped} (beat-level over-broad regression)"
    )
    assert stamped == [rendered_box[0]], (
        f"only the rendered beat's shot id must be stamped; got {stamped}"
    )


def test_budget_preflight_halt_stamps_nothing(tmp_path, monkeypatch):
    """If BudgetExhaustedError fires from the per-beat budget PREFLIGHT
    (episode_runner.py:1725/1734) BEFORE new_take (1821), the halting scene
    rendered NOTHING and rendered_shot_ids must be []. Pins: the per-beat
    take-count delta, not scene/written-scene presence, is the signal."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    monkeypatch.setenv("RECOIL_BOARD_GATE", "0")
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=0.0, concurrency=1,
    )
    # Match test_budget_exhaustion_halts_scene: limit=0 ZeroDivisions in the
    # warn threshold, so force the gate by mutating the spent counter.
    runner.budget_guard._spent = 1.0
    shots = [_canonical_shot("EP001_SH01", 1)]
    canonical = CanonicalPlan(
        episode_id="EP001", project="p", shots=shots,
        source_path=Path("fixture_plan.json"),
    )

    with pytest.raises(BudgetExhaustedError) as ei:
        asyncio.run(runner.run_episode_batches(canonical, dry_run=False))

    assert ei.value.rendered_shot_ids == [], (
        "a budget-preflight-halted scene (rendered nothing) leaked into "
        "rendered_shot_ids — regression"
    )


def test_budget_exhausted_error_declares_rendered_shot_ids_field():
    """rendered_shot_ids must be a real keyword-only __init__ field that
    defaults to [], NOT an ad-hoc dynamic attribute. Cross-module spend-path
    contract (the CLI reads it). Pins Phase 3a-0."""
    import inspect
    sig = inspect.signature(BudgetExhaustedError.__init__)
    assert "rendered_shot_ids" in sig.parameters, (
        "BudgetExhaustedError.__init__ does not declare rendered_shot_ids — "
        "it must be a typed field, not a dynamic attribute (Phase 3a-0)"
    )
    assert BudgetExhaustedError(remaining=0.0).rendered_shot_ids == []
    exc = BudgetExhaustedError(remaining=0.0, rendered_shot_ids=["S1", "S2"])
    assert exc.rendered_shot_ids == ["S1", "S2"]


def test_budget_halt_no_dispatch_scene_stamps_nothing(tmp_path, monkeypatch):
    """A scene that run_scene returns WITHOUT adding any take (loaded
    already-complete: empty eligible_beats at 2152 -> gather([]) -> zero new
    takes) must NOT contribute to rendered_shot_ids, even though
    scenes.append(scene) at 2568 runs unconditionally. Pins: the per-beat
    take-count delta — not the returned `scenes` list — is the signal."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    monkeypatch.setenv("RECOIL_BOARD_GATE", "0")
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=50.0, concurrency=1,
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    shots[1] = dataclasses.replace(shots[1], location_id="LOC_B")
    canonical = CanonicalPlan(
        episode_id="EP001", project="p", shots=shots,
        source_path=Path("fixture_plan.json"),
    )

    seen = []

    async def _run_scene(scene, *a, **k):
        seen.append(scene.scene_id)
        if len(seen) == 1:
            return scene   # already complete — adds NO take.
        raise BudgetExhaustedError(remaining=0.0)

    monkeypatch.setattr(runner, "run_scene", _run_scene)

    with pytest.raises(BudgetExhaustedError) as ei:
        asyncio.run(runner.run_episode_batches(canonical, dry_run=False))

    assert len(seen) == 2, f"expected 2 batches/scenes, got {seen}"
    assert ei.value.rendered_shot_ids == [], (
        f"a no-dispatch (already-complete) scene leaked into "
        f"rendered_shot_ids; got {ei.value.rendered_shot_ids}"
    )


def test_budget_halt_build_error_stub_beat_stamps_nothing(tmp_path, monkeypatch):
    """A beat whose run_scene appended ONLY a build-error STUB take
    (take_metadata['build_error'] set, no dispatch — episode_runner.py:1780)
    rendered nothing and must NOT contribute to rendered_shot_ids, even though
    its raw take count grew. Pins: the signal is the NON-build-error take
    delta, not the raw take count."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    monkeypatch.setenv("RECOIL_BOARD_GATE", "0")
    from recoil.pipeline.core.workflow import Workflow, WorkflowStep

    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=50.0, concurrency=1,
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    shots[1] = dataclasses.replace(shots[1], location_id="LOC_B")
    canonical = CanonicalPlan(
        episode_id="EP001", project="p", shots=shots,
        source_path=Path("fixture_plan.json"),
    )

    seen = []

    async def _run_scene(scene, *a, **k):
        seen.append(scene.scene_id)
        if len(seen) == 1:
            beat = scene.beats[0]
            t = beat.new_take(workflow=Workflow(
                workflow_id=f"{beat.beat_id}_take_0_buildfail",
                steps=[WorkflowStep(step_id="video", modality="video_i2v",
                                    payload={"shot_id": beat.beat_id})],
            ))
            t.status = "failed"
            t.take_metadata["build_error"] = "simulated payload build error"
            return scene
        raise BudgetExhaustedError(remaining=0.0)

    monkeypatch.setattr(runner, "run_scene", _run_scene)

    with pytest.raises(BudgetExhaustedError) as ei:
        asyncio.run(runner.run_episode_batches(canonical, dry_run=False))

    assert len(seen) == 2, f"expected 2 batches/scenes, got {seen}"
    assert ei.value.rendered_shot_ids == [], (
        f"a beat with only a build-error stub take (no dispatch) leaked into "
        f"rendered_shot_ids — raw take count used instead of the "
        f"non-build-error delta; got {ei.value.rendered_shot_ids}"
    )


def test_grouping_filename_scene_metadata_stamps_identity(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    batch = Batch(
        batch_id="BATCH_002",
        shots=shots,
        shared_location_id="LOC_A",
        total_duration_s=4.0,
    )

    scene = runner._scene_from_group(
        runner._group_from_batch(batch, fallback_ordinal=2)
    )

    assert scene.scene_id == "BATCH_002"
    assert scene.beats[0].beat_metadata["grouping"] == {
        "strategy": "continuity",
        "ordinal": 2,
        "shot_ids": ["EP001_SH01", "EP001_SH02"],
        "source_pass_id": None,
        "shotset_hash": shotset_hash(["EP001_SH01", "EP001_SH02"]),
    }


def test_below_threshold_i2v_fallback_beats_write_distinct_solo_stems(
    tmp_path,
    monkeypatch,
):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    batch = Batch(
        batch_id="BATCH_002",
        shots=shots,
        shared_location_id="LOC_A",
        total_duration_s=4.0,
        below_threshold=True,
    )
    scene = runner._scene_from_group(
        runner._group_from_batch(batch, fallback_ordinal=2)
    )

    assert [beat.beat_metadata["grouping"] for beat in scene.beats] == [
        {
            "strategy": "solo",
            "ordinal": 0,
            "shot_ids": ["EP001_SH01"],
            "source_pass_id": None,
            "shotset_hash": shotset_hash(["EP001_SH01"]),
        },
        {
            "strategy": "solo",
            "ordinal": 0,
            "shot_ids": ["EP001_SH02"],
            "source_pass_id": None,
            "shotset_hash": shotset_hash(["EP001_SH02"]),
        },
    ]

    start_frame = tmp_path / "start.png"
    start_frame.write_bytes(b"png")

    def _fake_payload(**kwargs):
        return {
            "shot_id": kwargs["shot"].shot_id,
            "prompt": "prompt",
            "model": "seeddance-2.0",
            "start_frame": str(start_frame),
            "duration": 2,
            "aspect_ratio": "9:16",
            "generate_audio": False,
            "grouping": dict(kwargs["grouping"]),
        }

    import recoil.pipeline._lib.dispatch_payload as dispatch_payload

    monkeypatch.setattr(dispatch_payload, "build_dispatch_payload", _fake_payload)

    video_dir = tmp_path / "renders"
    paths = SimpleNamespace(
        project="p",
        project_root=tmp_path,
        video_dir=video_dir,
        frames_dir=tmp_path / "frames",
        previs_dir=tmp_path / "previs",
    )

    class Store:
        def update_shot(self, *args, **kwargs):
            return None

        def get_shot(self, shot_id):
            return {}

        def append_take(self, shot_id, take_record):
            return 1

    class FakeVideoModelClient:
        def __init__(self, *args, **kwargs):
            pass

        def submit(self, payload):
            return SimpleNamespace(result=None)

        def wait_for_job(self, job, timeout_s, on_status=None):
            return GenerationResult(
                success=True,
                video_data=b"video",
                model="seeddance-2.0",
                cost=0.1,
                metadata={},
            )

    video_runner = VideoRunner(
        StepRunner(store=Store(), paths=paths, validate_frames=False)
    )

    with patch(
        "recoil.execution.video_model_client.VideoModelClient",
        FakeVideoModelClient,
    ):
        results = [
            video_runner.run(
                runner._build_workflow_for_beat(
                    beat,
                    take_index=0,
                    beat_index=index,
                ).steps[0].payload
            )
            for index, beat in enumerate(scene.beats)
        ]

    names = [Path(result.output_path).name for result in results]

    assert all(result.success for result in results)
    assert names == ["EP001_SH1_take1.mp4", "EP001_SH2_take1.mp4"]
    assert len({name.rsplit("_take", 1)[0] for name in names}) == 2
    assert all("_CONT_" not in name for name in names)


def test_scene_grouping_shotset_hash_persists_for_batch_and_solo(
    tmp_path,
    monkeypatch,
):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    expected_batch_hash = shotset_hash(["EP001_SH01", "EP001_SH02"])

    batch = Batch(
        batch_id="BATCH_002",
        shots=shots,
        shared_location_id="LOC_A",
        total_duration_s=4.0,
    )
    batch_scene = runner._scene_from_group(
        runner._group_from_batch(batch, fallback_ordinal=2)
    )
    batch_path = scene_path("p", "ep_001", batch_scene.scene_id)
    save_scene(batch_scene, batch_path)
    loaded_batch = load_scene(batch_path)

    batch_grouping = loaded_batch.beats[0].beat_metadata["grouping"]
    assert batch_grouping["shotset_hash"] == expected_batch_hash
    assert (
        loaded_batch.scene_metadata["grouping"]["shotset_hash"]
        == expected_batch_hash
    )

    reversed_batch = Batch(
        batch_id="BATCH_003",
        shots=list(reversed(shots)),
        shared_location_id="LOC_A",
        total_duration_s=4.0,
    )
    reversed_scene = runner._scene_from_group(
        runner._group_from_batch(reversed_batch, fallback_ordinal=3)
    )
    assert (
        reversed_scene.beats[0].beat_metadata["grouping"]["shotset_hash"]
        == expected_batch_hash
    )

    solo_batch = Batch(
        batch_id="BATCH_004",
        shots=shots,
        shared_location_id="LOC_A",
        total_duration_s=4.0,
        below_threshold=True,
    )
    solo_scene = runner._scene_from_group(
        runner._group_from_batch(solo_batch, fallback_ordinal=4)
    )
    solo_path = scene_path("p", "ep_001", solo_scene.scene_id)
    save_scene(solo_scene, solo_path)
    loaded_solo = load_scene(solo_path)

    assert [
        beat.beat_metadata["grouping"]["shotset_hash"]
        for beat in loaded_solo.beats
    ] == [
        shotset_hash(["EP001_SH01"]),
        shotset_hash(["EP001_SH02"]),
    ]


def test_shotset_hash_behavioral_gate_covers_all_board_bearing_units(
    tmp_path,
    monkeypatch,
):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
        _canonical_shot("EP001_SH03", 3),
        _canonical_shot("EP001_SH04", 4),
        _canonical_shot("EP001_SH05", 5),
        _canonical_shot("EP001_SH06", 6),
    ]
    canonical = CanonicalPlan(
        episode_id="EP001",
        project="p",
        shots=shots,
        source_path=Path("fixture_plan.json"),
    )
    ctx = GroupingContext(
        project="p",
        episode=1,
        canonical_plan=canonical,
        selected_coverage_passes=[],
        tier_map={shot.shot_id: 2 for shot in shots},
        wildcard_override=None,
    )

    continuity_group = get_grouping("continuity").assemble(shots[:3], ctx)[0]
    solo_group = get_grouping("solo").assemble([shots[3]], ctx)[0]
    coverage_pass = CoveragePass(
        pass_id="EP001_PASS_021_SH01_02_A_ENV",
        episode_id="ep_001",
        shot_range=("EP001_SH01", "EP001_SH02"),
        camera_side="A",
        label="plain coverage",
        focus_character="",
        pass_type="env",
        segments=[
            CoverageSegment(0, "EP001_SH02", "MS", 2, "b"),
            CoverageSegment(1, "EP001_SH01", "MS", 2, "a"),
        ],
    )
    mixed_wildcard_pass = CoveragePass(
        pass_id="EP001_PASS_022_SH04_05_A_JADE",
        episode_id="ep_001",
        shot_range=("EP001_SH04", "EP001_SH05"),
        camera_side="A",
        label="mixed wildcard coverage",
        focus_character="JADE",
        pass_type="character",
        segments=[
            CoverageSegment(0, "EP001_SH04", "MS", 2, "real a"),
            CoverageSegment(1, "wildcard", "MS", 2, "wild", is_wildcard=True),
            CoverageSegment(2, "EP001_SH05", "MS", 2, "real b"),
        ],
        wildcard_enabled=True,
    )
    wildcard_only_pass = CoveragePass(
        pass_id="EP001_PASS_023_SH05_06_A_JADE",
        episode_id="ep_001",
        shot_range=("EP001_SH05", "EP001_SH06"),
        camera_side="A",
        label="wildcard-only coverage",
        focus_character="JADE",
        pass_type="character",
        segments=[
            CoverageSegment(0, "wildcard", "MS", 2, "wild", is_wildcard=True),
        ],
        wildcard_enabled=True,
    )
    coverage_passes = [coverage_pass, mixed_wildcard_pass, wildcard_only_pass]
    coverage_ctx = dataclasses.replace(
        ctx,
        selected_coverage_passes=coverage_passes,
    )
    coverage_groups = get_grouping("coverage").assemble(shots, coverage_ctx)

    assert continuity_group.modality == "r2v_multi"
    assert continuity_group.identity.strategy == "continuity"
    assert solo_group.modality == "video_i2v"

    scene_units = [
        (
            "r2v_multi batch",
            continuity_group,
            runner._scene_from_group(continuity_group),
        ),
        ("solo video_i2v beat", solo_group, runner._scene_from_group(solo_group)),
    ] + [
        (coverage_pass.pass_id, group, runner._scene_from_group(group))
        for coverage_pass, group in zip(coverage_passes, coverage_groups)
    ]

    first_hashes = {}
    second_hashes = {}
    for name, group, scene in scene_units:
        assert scene.beats, name
        for beat in scene.beats:
            grouping = beat.beat_metadata["grouping"]
            actual_hash = grouping["shotset_hash"]
            assert actual_hash
            assert actual_hash == shotset_hash(grouping["shot_ids"])
            assert actual_hash == dataclasses.replace(
                group.identity,
                shot_ids=list(reversed(group.identity.shot_ids)),
            ).to_dict()["shotset_hash"]
            first_hashes[f"scene:{name}:{beat.beat_id}"] = actual_hash

        rederived_scene = runner._scene_from_group(group)
        for beat in rederived_scene.beats:
            second_hashes[f"scene:{name}:{beat.beat_id}"] = (
                beat.beat_metadata["grouping"]["shotset_hash"]
            )

    for coverage_pass in coverage_passes:
        first_dict = coverage_pass.to_dict()
        second_dict = coverage_pass.to_dict()
        actual_hash = first_dict["shotset_hash"]
        assert actual_hash
        assert actual_hash == second_dict["shotset_hash"]
        assert actual_hash == dataclasses.replace(
            coverage_pass,
            segments=list(reversed(coverage_pass.segments)),
        ).to_dict()["shotset_hash"]
        first_hashes[f"pass:{coverage_pass.pass_id}"] = actual_hash
        second_hashes[f"pass:{coverage_pass.pass_id}"] = second_dict["shotset_hash"]

    assert (
        json.dumps(first_hashes, sort_keys=True).encode("utf-8")
        == json.dumps(second_hashes, sort_keys=True).encode("utf-8")
    )

    coverage_scene_by_pass_id = {
        scene.scene_id: scene
        for name, _group, scene in scene_units
        if name.startswith("EP001_PASS_")
    }
    for coverage_pass in coverage_passes:
        pass_hash = coverage_pass.to_dict()["shotset_hash"]
        beat_grouping = coverage_scene_by_pass_id[
            coverage_pass.pass_id
        ].beats[0].beat_metadata["grouping"]
        assert beat_grouping["shotset_hash"] == pass_hash

    mixed_grouping = coverage_scene_by_pass_id[
        mixed_wildcard_pass.pass_id
    ].beats[0].beat_metadata["grouping"]
    assert mixed_grouping["shot_ids"] == ["EP001_SH04", "EP001_SH05"]
    assert "wildcard" not in mixed_grouping["shot_ids"]
    assert mixed_wildcard_pass.to_dict()["shotset_hash"] == shotset_hash(
        ["EP001_SH04", "EP001_SH05"]
    )
    assert wildcard_only_pass.to_dict()["shotset_hash"] == shotset_hash(
        ["EP001_SH05", "EP001_SH06"]
    )


def test_single_shot_oner_keeps_identity_through_dispatch_and_writer(
    tmp_path,
    monkeypatch,
):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
    )
    shots = [_canonical_shot("EP001_SH05", 1)]
    canonical = CanonicalPlan(
        episode_id="EP001",
        project="p",
        shots=shots,
        source_path=Path("ep_001_plan.json"),
    )
    grouping_ctx = GroupingContext(
        project="p",
        episode=1,
        canonical_plan=canonical,
        selected_coverage_passes=[],
        tier_map={"EP001_SH05": 2},
        wildcard_override=None,
    )
    group = get_grouping("oner").assemble(shots, grouping_ctx)[0]
    scene = runner._scene_from_group(group)
    grouping = {
        "strategy": "oner",
        "ordinal": 1,
        "shot_ids": ["EP001_SH05"],
        "source_pass_id": None,
        "shotset_hash": shotset_hash(["EP001_SH05"]),
    }

    assert group.modality == "video_i2v"
    assert scene.scene_id == "ONER_001"
    assert scene.beats[0].beat_metadata["grouping"] == grouping

    start_frame = tmp_path / "start.png"
    start_frame.write_bytes(b"png")

    def _fake_payload(**kwargs):
        return {
            "shot_id": kwargs["shot"].shot_id,
            "prompt": "one shot oner",
            "model": "seeddance-2.0",
            "start_frame": str(start_frame),
            "duration": 4,
            "aspect_ratio": "9:16",
            "generate_audio": False,
            "grouping": dict(kwargs["grouping"]),
        }

    import recoil.pipeline._lib.dispatch_payload as dispatch_payload

    monkeypatch.setattr(dispatch_payload, "build_dispatch_payload", _fake_payload)

    workflow = runner._build_workflow_for_beat(
        scene.beats[0],
        take_index=0,
        beat_index=0,
    )
    payload = workflow.steps[0].payload

    assert payload["grouping"] == grouping
    assert workflow.global_provenance["grouping"] == grouping

    video_dir = tmp_path / "renders"
    paths = SimpleNamespace(
        project="p",
        project_root=tmp_path,
        video_dir=video_dir,
        frames_dir=tmp_path / "frames",
        previs_dir=tmp_path / "previs",
    )

    class Store:
        def update_shot(self, *args, **kwargs):
            return None

        def get_shot(self, shot_id):
            return {}

        def append_take(self, shot_id, take_record):
            return 1

    class FakeVideoModelClient:
        def __init__(self, *args, **kwargs):
            pass

        def submit(self, payload):
            return SimpleNamespace(result=None)

        def wait_for_job(self, job, timeout_s, on_status=None):
            return GenerationResult(
                success=True,
                video_data=b"video",
                model="seeddance-2.0",
                cost=0.1,
                metadata={},
            )

    video_runner = VideoRunner(
        StepRunner(store=Store(), paths=paths, validate_frames=False)
    )

    with patch(
        "recoil.execution.video_model_client.VideoModelClient",
        FakeVideoModelClient,
    ):
        result = video_runner.run(payload)

    output_name = Path(result.output_path).name
    sidecar = (video_dir / output_name).with_suffix(".mp4.json")
    sidecar_data = json.loads(sidecar.read_text())

    assert result.success is True
    assert "_ONER_001_" in output_name
    assert output_name != "EP001_SH5_take1.mp4"
    assert sidecar_data["provenance"]["grouping"] == grouping


def test_grouping_filename_workflow_payload_uses_grouping_metadata(
    tmp_path,
    monkeypatch,
):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
    )
    shot = _canonical_shot("EP001_SH01")
    grouping = {
        "strategy": "coverage",
        "ordinal": 11,
        "shot_ids": ["EP001_SH01", "EP001_SH02"],
        "source_pass_id": "EP001_PASS_011_SH01_02_A_JADE",
        "shotset_hash": shotset_hash(["EP001_SH01", "EP001_SH02"]),
    }
    beat = Beat(
        beat_id="EP001_PASS_011_SH01_02_A_JADE__cov",
        beat_metadata={
            "scene_id": "EP001_PASS_011_SH01_02_A_JADE",
            "shot": dataclasses.asdict(shot),
            "batch_shots": [dataclasses.asdict(shot)],
            "modality": "r2v_multi",
            "grouping": grouping,
        },
    )
    captured = {}

    def _fake_payload(**kwargs):
        captured.update(kwargs)
        builder_grouping = kwargs["grouping"]
        return {
            "shot_id": kwargs["shot"].shot_id,
            "prompt": "prompt",
            "segment_shot_ids": ["EP001_SH01", "EP001_SH02"],
            "expected_segment_timestamps": [(0.0, 1.0), (1.0, 2.0)],
            "reference_images": [],
            "grouping": dict(builder_grouping),
            "output_filename": (
                f"EP001_COV_{builder_grouping['ordinal']:03d}_SH01_02_take1.mp4"
            ),
        }

    import recoil.pipeline._lib.dispatch_payload as dispatch_payload

    monkeypatch.setattr(dispatch_payload, "build_dispatch_payload", _fake_payload)

    wf = runner._build_workflow_for_beat(beat, take_index=0, beat_index=0)
    payload = wf.steps[0].payload

    assert payload["grouping"] == grouping
    assert captured["grouping"] == grouping
    assert payload["output_filename"] == "EP001_COV_011_SH01_02_take1.mp4"
    assert wf.global_provenance["grouping"] == grouping
    assert "pass_counter" not in payload


def test_grouping_runner_uses_registry_for_solo(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001", concurrency=1,
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    canonical = type(
        "CanonicalPlanStub",
        (),
        {"shots": shots, "raw": _plan()},
    )()

    async def _noop(scene, **kwargs):
        return scene

    monkeypatch.setattr(runner, "run_scene", _noop)

    scenes = asyncio.run(
        runner.run_episode_batches(canonical, dry_run=True, grouping="solo")
    )

    assert [scene.scene_id for scene in scenes] == ["EP001_SH01", "EP001_SH02"]
    assert [
        scene.scene_metadata["grouping"] for scene in scenes
    ] == [
        {
            "strategy": "solo",
            "ordinal": 0,
            "shot_ids": ["EP001_SH01"],
            "source_pass_id": None,
            "shotset_hash": shotset_hash(["EP001_SH01"]),
        },
        {
            "strategy": "solo",
            "ordinal": 0,
            "shot_ids": ["EP001_SH02"],
            "source_pass_id": None,
            "shotset_hash": shotset_hash(["EP001_SH02"]),
        },
    ]


def test_grouping_runner_uses_selected_coverage_pass_identity(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001", concurrency=1,
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    canonical = type(
        "CanonicalPlanStub",
        (),
        {"shots": shots, "raw": _plan()},
    )()
    selected = CoveragePass(
        pass_id="EP001_PASS_011_SH01_02_A_ENV",
        episode_id="ep_001",
        shot_range=("EP001_SH01", "EP001_SH02"),
        camera_side="A",
        label="fixture",
        focus_character="",
        pass_type="env",
        segments=[
            CoverageSegment(0, "EP001_SH01", "MS", 2, "a"),
            CoverageSegment(1, "EP001_SH02", "MS", 2, "b"),
        ],
    )

    async def _noop(scene, **kwargs):
        return scene

    monkeypatch.setattr(runner, "run_scene", _noop)

    scenes = asyncio.run(
        runner.run_episode_batches(
            canonical,
            dry_run=True,
            grouping="coverage",
            selected_coverage_passes=[selected],
        )
    )

    assert len(scenes) == 1
    assert scenes[0].scene_id == "EP001_PASS_011_SH01_02_A_ENV"
    assert scenes[0].beats[0].beat_metadata["grouping"] == {
        "strategy": "coverage",
        "ordinal": 11,
        "shot_ids": ["EP001_SH01", "EP001_SH02"],
        "source_pass_id": "EP001_PASS_011_SH01_02_A_ENV",
        "shotset_hash": shotset_hash(["EP001_SH01", "EP001_SH02"]),
    }


def test_grouping_refresh_reused_coverage_scene_stages_pass_config_candidate_without_dispatch(
    tmp_path,
    monkeypatch,
    capsys,
):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001", concurrency=1,
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    canonical = type(
        "CanonicalPlanStub",
        (),
        {"shots": shots, "raw": _plan()},
    )()
    pass_id = "EP001_PASS_011_SH01_02_A_ENV"
    generation_config = {
        "tier": "pro",
        "seed": 1234,
        "resolution": "1080p",
    }
    element_config = {
        "identity_ref_mode": "full_turnaround",
        "character_elements": [{"char_id": "JADE"}],
    }
    selected = CoveragePass(
        pass_id=pass_id,
        episode_id="ep_001",
        shot_range=("EP001_SH01", "EP001_SH02"),
        camera_side="A",
        label="fixture",
        focus_character="",
        pass_type="env",
        segments=[
            CoverageSegment(0, "EP001_SH01", "MS", 2, "a"),
            CoverageSegment(1, "EP001_SH02", "MS", 2, "b"),
        ],
        generation_config=generation_config,
        element_config=element_config,
    )
    path = scene_path("p", "ep_001", pass_id)
    stale_video = tmp_path / "stale.mp4"
    stale_video.write_bytes(b"video")
    stale_receipt = GenerationReceipt(
        receipt_id="rcpt_stale",
        modality="r2v_multi",
        caller_id="test",
        project="p",
        episode=1,
        shot_id="EP001_SH01",
        timestamp_utc=utc_now_iso8601(),
        run_result=RunResult(
            id="stale",
            modality="r2v_multi",
            output_path=str(stale_video),
            success=True,
        ),
    )
    stale_take = Take(
        f"{pass_id}__cov_take_0",
        0,
        Workflow(
            workflow_id="stale",
            steps=[
                WorkflowStep(
                    step_id="video",
                    modality="r2v_multi",
                    payload={},
                    status="succeeded",
                    receipt=stale_receipt,
                )
            ],
        ),
        status="succeeded",
    )
    stale = Scene(
        scene_id=pass_id,
        beats=[
            Beat(
                beat_id=f"{pass_id}__cov",
                takes=[stale_take],
                beat_metadata={
                    "scene_id": pass_id,
                    "shot": dataclasses.asdict(shots[0]),
                    "batch_shots": [dataclasses.asdict(s) for s in shots],
                    "modality": "r2v_multi",
                    "grouping": {
                        "strategy": "coverage",
                        "ordinal": 1,
                        "shot_ids": ["EP001_SH01"],
                        "source_pass_id": pass_id,
                    },
                    "generation_config": {"tier": "stale"},
                    "element_config": {"identity_ref_mode": "stale"},
                },
            )
        ],
    )
    stale.beats[0].select_primary()
    save_scene(stale, path)
    original_active_version = 1
    prior_version_count = 1
    run_scene_calls = []

    async def _capture_dispatch(scene, **kwargs):
        run_scene_calls.append(scene.scene_id)
        return scene

    monkeypatch.setattr(runner, "run_scene", _capture_dispatch)

    asyncio.run(runner.run_episode_batches(
        canonical,
        dry_run=False,
        grouping="coverage",
        selected_coverage_passes=[selected],
    ))
    output = capsys.readouterr().out

    active_scene, loaded_version = load_scene_active_with_version("p", "ep_001", pass_id)
    active_beat = active_scene.beats[0]
    manifest = load_manifest("p", "ep_001", pass_id)
    assert manifest is not None
    assert loaded_version == original_active_version
    assert manifest["active_version"] == original_active_version
    assert len(manifest["versions"]) == prior_version_count + 1

    candidate_entry = next(
        entry for entry in manifest["versions"]
        if entry["version"] != manifest["active_version"]
    )
    assert candidate_entry["downstream"] == "not_derived"
    candidate_scene = load_scene(
        scene_version_path("p", "ep_001", pass_id, candidate_entry["version"])
    )
    candidate_beat = candidate_scene.beats[0]
    expected_grouping = {
        "strategy": "coverage",
        "ordinal": 11,
        "shot_ids": ["EP001_SH01", "EP001_SH02"],
        "source_pass_id": pass_id,
        "shotset_hash": shotset_hash(["EP001_SH01", "EP001_SH02"]),
    }
    assert candidate_beat.beat_metadata["grouping"] == expected_grouping
    assert candidate_beat.beat_metadata["generation_config"] == generation_config
    assert candidate_beat.beat_metadata["element_config"] == element_config
    assert candidate_beat.takes == []
    assert candidate_beat.primary_take_id is None

    old_grouping = {
        "strategy": "coverage",
        "ordinal": 1,
        "shot_ids": ["EP001_SH01"],
        "source_pass_id": pass_id,
    }
    assert active_beat.beat_metadata["grouping"] == old_grouping
    assert active_beat.beat_metadata["generation_config"] == {"tier": "stale"}
    assert active_beat.beat_metadata["element_config"] == {"identity_ref_mode": "stale"}
    assert [take.take_id for take in active_beat.takes] == [stale_take.take_id]
    assert active_beat.primary_take_id == stale_take.take_id
    assert run_scene_calls == []

    flat_beat = load_scene(path).beats[0]
    assert flat_beat.beat_metadata["grouping"] == old_grouping
    assert flat_beat.beat_metadata["generation_config"] == {"tier": "stale"}
    assert flat_beat.beat_metadata["element_config"] == {"identity_ref_mode": "stale"}
    assert [take.take_id for take in flat_beat.takes] == [stale_take.take_id]

    assert f"appended candidate version v{candidate_entry['version']}" in output
    assert "active remains v1" in output
    assert "not_derived" in output
    assert "rederive --conform" in output

    # REC-231 (audit CRITICAL): a run that only STAGED a not_derived candidate (no
    # dispatch, active version unmoved) must NOT stamp stages.scenes as derived — that
    # would falsely mark the freshness SSOT fresh for an inactive candidate. _stamp_
    # scenes_if_complete must skip when candidate_versions is populated on a non-derive run.
    drv = derivation_manifest.load("p", 1)
    assert "scenes" not in (drv.get("stages") or {})

    version_count = len(manifest["versions"])
    asyncio.run(runner.run_episode_batches(
        canonical,
        dry_run=False,
        grouping="coverage",
        selected_coverage_passes=[selected],
    ))
    rerun_output = capsys.readouterr().out
    rerun_manifest = load_manifest("p", "ep_001", pass_id)
    assert rerun_manifest is not None
    assert rerun_manifest["active_version"] == original_active_version
    assert len(rerun_manifest["versions"]) == version_count
    assert "candidate already exists" in rerun_output
    assert "active remains v1" in rerun_output


def test_scene_persistence_round_trips_grouping_and_pass_configs(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    grouping = {
        "strategy": "coverage",
        "ordinal": 11,
        "shot_ids": ["EP001_SH01", "EP001_SH02"],
        "source_pass_id": "EP001_PASS_011_SH01_02_A_ENV",
        "shotset_hash": shotset_hash(["EP001_SH01", "EP001_SH02"]),
    }
    generation_config = {"tier": "pro", "seed": 1234}
    element_config = {"identity_ref_mode": "full_turnaround"}
    scene = Scene(
        scene_id="EP001_PASS_011_SH01_02_A_ENV",
        beats=[
            Beat(
                beat_id="EP001_PASS_011_SH01_02_A_ENV__cov",
                beat_metadata={
                    "grouping": grouping,
                    "generation_config": generation_config,
                    "element_config": element_config,
                },
            )
        ],
    )
    path = scene_path("p", "ep_001", scene.scene_id)

    save_scene(scene, path)
    loaded = load_scene(path)

    metadata = loaded.beats[0].beat_metadata
    assert metadata["grouping"] == grouping
    assert metadata["generation_config"] == generation_config
    assert metadata["element_config"] == element_config


def test_grouping_coverage_registry_preserves_pass_configs_in_payload(
    tmp_path,
    monkeypatch,
):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001", concurrency=1,
    )
    shots = [
        _canonical_shot("EP001_SH01", 1),
        _canonical_shot("EP001_SH02", 2),
    ]
    canonical = type(
        "CanonicalPlanStub",
        (),
        {"shots": shots, "raw": _plan()},
    )()
    generation_config = {
        "tier": "pro",
        "seed": 1234,
        "resolution": "1080p",
        "format_mode": "C",
        "anchor_duration_s": 8,
        "regen_previz_for_segment": True,
    }
    element_config = {
        "location_id": "LOC_A",
        "character_elements": [{"char_id": "JADE"}],
    }
    selected = CoveragePass(
        pass_id="EP001_PASS_011_SH01_02_A_JADE",
        episode_id="ep_001",
        shot_range=("EP001_SH01", "EP001_SH02"),
        camera_side="A",
        label="fixture",
        focus_character="JADE",
        pass_type="character",
        segments=[
            CoverageSegment(0, "EP001_SH01", "MS", 2, "a"),
            CoverageSegment(1, "EP001_SH02", "MS", 2, "b"),
        ],
        generation_config=generation_config,
        element_config=element_config,
    )

    async def _noop(scene, **kwargs):
        return scene

    monkeypatch.setattr(runner, "run_scene", _noop)

    scenes = asyncio.run(
        runner.run_episode_batches(
            canonical,
            dry_run=True,
            grouping="coverage",
            selected_coverage_passes=[selected],
        )
    )
    beat = scenes[0].beats[0]
    assert beat.beat_metadata["generation_config"] == generation_config
    assert beat.beat_metadata["element_config"] == element_config
    assert beat.beat_metadata["batch_summary"]["shot_ids"] == [
        "EP001_SH01",
        "EP001_SH02",
    ]

    def _fake_payload(**kwargs):
        return {
            "shot_id": kwargs["shot"].shot_id,
            "prompt": "prompt",
            "tier": "fast",
            "seed": 99,
            "resolution": "720p",
            "provider_hints": {"keep": "value"},
            "element_config": {"existing": True},
            "segment_shot_ids": ["EP001_SH01", "EP001_SH02"],
            "expected_segment_timestamps": [(0.0, 1.0), (1.0, 2.0)],
            "reference_images": [],
        }

    import recoil.pipeline._lib.dispatch_payload as dispatch_payload

    monkeypatch.setattr(dispatch_payload, "build_dispatch_payload", _fake_payload)

    wf = runner._build_workflow_for_beat(beat, take_index=0, beat_index=0)
    payload = wf.steps[0].payload

    for key, value in generation_config.items():
        assert payload[key] == value
    assert payload["provider_hints"]["keep"] == "value"
    assert payload["provider_hints"]["tier"] == "pro"
    assert payload["provider_hints"]["seed"] == 1234
    assert payload["element_config"] == {
        "existing": True,
        "location_id": "LOC_A",
        "character_elements": [{"char_id": "JADE"}],
    }


# ---------------------------------------------------------------------------
# REC-122 — budget reservation leak + phantom-cost accounting
# ---------------------------------------------------------------------------


def test_build_failure_releases_budget_reservation(tmp_path, monkeypatch):
    """Every PayloadBuildError take must release its would_exceed reservation.

    REC-122 regression: 6 build failures (missing start frames) each leaked
    a full reservation; the accumulated phantom holds later false-fired
    BudgetExhaustedError with real headroom remaining.
    """
    from recoil.pipeline.orchestrator.episode_runner import PayloadBuildError

    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=10.0, concurrency=1, max_takes=3,
    )
    scene = runner.init_scene("SEQ11")

    monkeypatch.setattr(
        EpisodeRunner, "_estimate_take_cost", lambda self, beat: 1.5
    )

    def _boom(self, beat, take_index, beat_index=None, grouping_override=None, **kwargs):
        raise PayloadBuildError(
            beat_id=beat.beat_id, cause=RuntimeError("start frame missing")
        )

    monkeypatch.setattr(EpisodeRunner, "_build_workflow_for_beat", _boom)

    asyncio.run(runner.run_scene(scene, dry_run=False))

    # 2 beats x 3 takes = 6 build failures; without the release fix the
    # guard would be carrying 6 x $1.50 = $9.00 of dead reservations.
    assert runner.budget_guard.reserved == pytest.approx(0.0)
    assert runner.budget_guard.spent == pytest.approx(0.0)


def test_active_version_move_after_running_presave_rejects_before_video_dispatch(
    tmp_path, monkeypatch
):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=50.0, concurrency=1, max_takes=1,
        step_runner=SimpleNamespace(),
    )
    scene = Scene(
        scene_id="SEQ11",
        beats=[Beat(beat_id="EP001_SH01", max_takes=1)],
        scene_metadata={"episode": "ep_001", "project": "p"},
    )
    save_scene(scene, scene_path("p", "ep_001", "SEQ11"))
    store = SceneVersionStore("p", "ep_001")
    store.write_scene_candidate(
        "SEQ11",
        Scene(
            scene_id="SEQ11",
            beats=[Beat(beat_id="EP001_SH01", max_takes=1)],
            scene_metadata={"episode": "ep_001", "project": "p"},
        ),
    )
    scene, loaded_version = load_scene_active_with_version("p", "ep_001", "SEQ11")
    moved = False
    original_persist = runner._persist_active_status

    def _persist_then_move(active_scene, *, expected_version, dry_run=False):
        nonlocal moved
        original_persist(
            active_scene,
            expected_version=expected_version,
            dry_run=dry_run,
        )
        if not moved:
            store.conform("SEQ11", 2)
            moved = True

    def _wf(self, beat, take_index, beat_index=None, grouping_override=None, **kwargs):
        return Workflow(
            workflow_id=f"{beat.beat_id}_take_{take_index}",
            steps=[WorkflowStep(step_id="video", modality="video_i2v",
                                payload={"shot_id": beat.beat_id})],
            global_provenance={"shot_id": beat.beat_id},
        )

    dispatch_calls = []

    def _paid_execute(self, context=None):
        dispatch_calls.append(self.take_id)
        raise RuntimeError("paid dispatch must not run")

    monkeypatch.setattr(runner, "_persist_active_status", _persist_then_move)
    monkeypatch.setattr(EpisodeRunner, "_build_workflow_for_beat", _wf)
    monkeypatch.setattr(Take, "execute", _paid_execute)

    with pytest.raises(SceneVersionConflictError):
        asyncio.run(runner._dispatch_one_beat(
            scene.beats[0],
            scene,
            expected_version=loaded_version,
            dry_run=False,
        ))

    assert dispatch_calls == []


def test_failed_take_cost_charges_as_failed_but_billed_not_succeeded(
    tmp_path, monkeypatch
):
    """A failed take whose receipt carries provider-reported cost must be
    charged kind="failed_but_billed" — never kind="succeeded" (REC-122:
    phantom estimates on billing-rejected runs were charged as succeeded
    spend, inflating the tally)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=50.0, concurrency=1, max_takes=1,
        step_runner=SimpleNamespace(),
    )
    scene = runner.init_scene("SEQ11")
    monkeypatch.setattr(
        EpisodeRunner, "_estimate_take_cost", lambda self, beat: 3.15
    )

    from recoil.pipeline.core.workflow import Workflow, WorkflowStep

    def _wf(self, beat, take_index, beat_index=None, grouping_override=None, **kwargs):
        return Workflow(
            workflow_id=f"{beat.beat_id}_take_{take_index}",
            steps=[WorkflowStep(step_id="video", modality="video_i2v",
                                payload={"shot_id": beat.beat_id})],
            global_provenance={"shot_id": beat.beat_id},
        )

    monkeypatch.setattr(EpisodeRunner, "_build_workflow_for_beat", _wf)

    from recoil.pipeline.core.take import Take

    def _fail_billed(self, context=None):
        self.status = "failed"
        for step in self.workflow.steps:
            step.status = "failed"
            step.receipt = SimpleNamespace(
                run_result=SimpleNamespace(
                    success=False,
                    error="provider reported FAILED",
                    metadata={"cost_usd": 3.15},
                ),
                to_dict=lambda: {},
            )

    monkeypatch.setattr(Take, "execute", _fail_billed)

    asyncio.run(runner.run_scene(scene, dry_run=False))

    kinds = [e["kind"] for e in runner.budget_guard.events]
    assert kinds, "expected at least one charge event"
    assert all(k == "failed_but_billed" for k in kinds), kinds
    assert runner.budget_guard.reserved == pytest.approx(0.0)


def test_mixed_success_and_failed_billed_steps_charge_both(
    tmp_path, monkeypatch
):
    """A take with one successful step AND one failed-but-billed step must
    charge BOTH buckets (gate finding on REC-122: the mutually-exclusive
    charge block silently dropped the billed failure)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    runner = EpisodeRunner(
        project="p", plan=_plan(), casting={}, episode="ep_001",
        budget_usd=50.0, concurrency=1, max_takes=1,
        step_runner=SimpleNamespace(),
    )
    scene = runner.init_scene("SEQ11")
    monkeypatch.setattr(
        EpisodeRunner, "_estimate_take_cost", lambda self, beat: 5.0
    )

    from recoil.pipeline.core.workflow import Workflow, WorkflowStep

    def _wf(self, beat, take_index, beat_index=None, grouping_override=None, **kwargs):
        return Workflow(
            workflow_id=f"{beat.beat_id}_take_{take_index}",
            steps=[
                WorkflowStep(step_id="kf", modality="image_t2i",
                             payload={"shot_id": beat.beat_id}),
                WorkflowStep(step_id="video", modality="video_i2v",
                             payload={"shot_id": beat.beat_id}),
            ],
            global_provenance={"shot_id": beat.beat_id},
        )

    monkeypatch.setattr(EpisodeRunner, "_build_workflow_for_beat", _wf)

    from recoil.pipeline.core.take import Take

    def _mixed(self, context=None):
        self.status = "partial"
        ok, fail = self.workflow.steps
        ok.status = "succeeded"
        ok.receipt = SimpleNamespace(
            run_result=SimpleNamespace(
                success=True, error=None, metadata={"cost_usd": 1.0}
            ),
            to_dict=lambda: {},
        )
        fail.status = "failed"
        fail.receipt = SimpleNamespace(
            run_result=SimpleNamespace(
                success=False,
                error="content_policy_violation",
                metadata={"cost_usd": 0.0, "inference_billed_usd": 2.5},
            ),
            to_dict=lambda: {},
        )

    monkeypatch.setattr(Take, "execute", _mixed)

    asyncio.run(runner.run_scene(scene, dry_run=False))

    kinds = sorted(e["kind"] for e in runner.budget_guard.events)
    assert "succeeded" in kinds and "failed_but_billed" in kinds, kinds
    assert runner.budget_guard.spent == pytest.approx(
        (1.0 + 2.5) * len(scene.beats)
    )
    assert runner.budget_guard.reserved == pytest.approx(0.0)
