from __future__ import annotations

import asyncio
import copy
import dataclasses
import inspect
import json
import sys
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock

import pytest

from recoil.core.naming import build_filename
from recoil.execution.step_types import PassResult
from recoil.pipeline.cli import generate
from recoil.pipeline._lib.plan_loader import CanonicalPlan, CanonicalShot, CharacterEntry
from recoil.pipeline.core.persistence import save_scene, scene_path
from recoil.pipeline.core.take import Scene
from recoil.pipeline.orchestrator.episode_runner import (
    EpisodeRunner,
    RerollPreflightError,
)
from recoil.pipeline.orchestrator.tests.test_reroll_new_take import (
    _FakeStepRunner,
    _runner,
    _shot,
    _succeeded_beat,
    _write_cli_project,
    _write_location_ref,
)


@pytest.fixture(autouse=True)
def _isolate_projects_root(tmp_path, monkeypatch):
    root = tmp_path / "projects"
    root.mkdir()
    (root / ".recoil-data-root").touch()
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(root))


def _plan(shots) -> CanonicalPlan:
    return CanonicalPlan(
        episode_id="ep_001",
        project="fixture",
        shots=list(shots),
        source_path=Path("fixture.json"),
        raw={"shots": []},
    )


def _character_shot(shot_id: str, scene_index: int) -> CanonicalShot:
    base = _shot(shot_id, scene_index)
    raw = {
        **base.raw,
        "camera_side": "A",
        "screen_direction": "left-to-right",
        "asset_data": {
            "location_id": "L1",
            "characters": [{"char_id": "JADE"}],
        },
        "prompt_data": {
            "shot_type": "MS",
            "prompt_skeleton": {"action_line": f"Jade crosses frame in {shot_id}."},
        },
        "routing_data": {"target_editorial_duration_s": 2.0},
    }
    return dataclasses.replace(
        base,
        characters=[CharacterEntry(char_id="JADE")],
        is_env_only=False,
        raw=raw,
    )


def _character_shots() -> list[CanonicalShot]:
    return [
        _character_shot("EP001_SH23", 1),
        _character_shot("EP001_SH24", 2),
        _character_shot("EP001_SH25", 3),
    ]


def _character_succeeded_beat(video_dir: Path):
    beat = _succeeded_beat(video_dir)
    shots = _character_shots()
    beat.beat_metadata["shot"] = dataclasses.asdict(shots[0])
    beat.beat_metadata["batch_shots"] = [dataclasses.asdict(s) for s in shots]
    return beat


def _single_shot_succeeded_beat(video_dir: Path, shot_id: str = "EP001_SH22"):
    beat = _succeeded_beat(video_dir)
    shot = _shot(shot_id, 0)
    video = video_dir / build_filename(episode=1, shot_ids=[shot_id], take=1)
    video.parent.mkdir(parents=True, exist_ok=True)
    video.write_bytes(b"non-target-primary")
    beat.beat_id = f"PASS_011__{shot_id}"
    beat.beat_metadata["modality"] = "video_i2v"
    beat.beat_metadata["shot"] = dataclasses.asdict(shot)
    beat.beat_metadata.pop("batch_shots", None)
    step = beat.primary_take.workflow.steps[0]
    step.modality = "video_i2v"
    step.payload = {"shot_id": shot_id}
    step.receipt.run_result.output_path = str(video)
    return beat


class _PassCounterAwareStepRunner(_FakeStepRunner):
    def execute_pass(self, **kwargs):
        self.submit({"params": {}})
        if self.fail:
            return PassResult(
                pass_id=kwargs["pass_id"],
                success=False,
                video_path=None,
                cost_usd=0.0,
                model=kwargs.get("model", ""),
                error="provider failed",
                take_index=kwargs.get("forced_take_number") or -1,
            )
        take_num = int(kwargs.get("forced_take_number") or 1)
        segment_shot_ids = list(kwargs["segment_shot_ids"])
        path = self.video_dir / build_filename(
            episode=1,
            pass_counter=int(kwargs["pass_counter"]),
            shot_ids=segment_shot_ids,
            take=take_num,
        )
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_bytes(self.artifact)
        return PassResult(
            pass_id=kwargs["pass_id"],
            success=True,
            video_path=str(path),
            cost_usd=0.0,
            model=kwargs.get("model", ""),
            take_index=take_num,
            expected_cuts=max(0, len(segment_shot_ids) - 1),
        )


def _pass_take_path(video_dir: Path, take: int, pass_counter: int = 1) -> Path:
    return video_dir / build_filename(
        episode=1,
        pass_counter=pass_counter,
        shot_ids=["EP001_SH23", "EP001_SH24", "EP001_SH25"],
        take=take,
    )


def _write_character_ref(project: str = "fixture", char_id: str = "JADE") -> Path:
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.core.ref_resolver import slugify_asset_id

    subject = slugify_asset_id(char_id)
    ref = (
        CoreProjectPaths.for_project(project).asset_subject_dir("char", subject)
        / f"{subject}_identity_hero_v01.png"
    )
    ref.parent.mkdir(parents=True, exist_ok=True)
    ref.write_bytes(b"stable-character-ref")
    return ref


def _assert_directed_prose_character_fixture(shots: list[CanonicalShot]) -> None:
    from recoil.pipeline._lib.author_strategies import resolve_strategy
    from recoil.pipeline._lib.dispatch_payload import PayloadContext
    from recoil.pipeline._lib.shot_primitive import primitive_from_payload_context

    ctx = PayloadContext(
        project="fixture",
        modality="r2v_multi",
        shot_id=shots[0].shot_id,
        shot=shots[0],
        batch_shots=shots,
        duration_s=6.0,
        bible={},
    )
    primitive = primitive_from_payload_context(
        ctx,
        ref_manifest={"identity_1": 1, "scene_1": 2},
        segment_timestamps=[(0.0, 2.0), (2.0, 4.0), (4.0, 6.0)],
    )
    modality, strategy = resolve_strategy(
        primitive,
        model_id="seeddance-2.0",
        requested_modality="r2v_multi",
    )
    assert modality == "r2v_multi"
    assert strategy.name == "directed_prose"


def _authored_directed_prose() -> str:
    return (
        "[0:00-0:02] Jade crosses frame left-to-right as the camera holds "
        "a medium profile.\n"
        "[0:02-0:04] Jade passes the midpoint while the lens keeps her "
        "shoulders centered.\n"
        "[0:04-0:06] Jade exits frame as the camera settles on the cleared "
        "space."
    )


def test_new_take_preserves_loaded_legacy_r2v_beat_id(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    beat.beat_id = "EP001_PASS_011_SH23_24_25_A_JADE__cov"
    scene = Scene(scene_id="PASS_011", beats=[beat])
    save_scene(scene, scene_path("fixture", "ep_001", "PASS_011"))
    runner = _runner(_FakeStepRunner(video_dir, submit))

    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
        )
    )

    assert scenes[0].beats[0].beat_id == "EP001_PASS_011_SH23_24_25_A_JADE__cov"
    assert len(scenes[0].beats[0].takes) == 2
    receipt = scenes[0].beats[0].takes[-1].workflow.steps[0].receipt
    assert receipt is not None
    assert Path(receipt.run_result.output_path).exists()
    assert submit.call_count == 1


def test_new_take_dispatches_approved_target(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    beat.approved = True
    save_scene(
        Scene(scene_id="PASS_011", beats=[beat]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )
    runner = _runner(_FakeStepRunner(video_dir, submit))

    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
        )
    )

    rerolled = scenes[0].beats[0]
    assert rerolled.approved is True
    assert len(rerolled.takes) == 2
    assert (video_dir / "EP001_PASS_001_SH23_24_25_take2.mp4").exists()
    assert submit.call_count == 1


def test_new_take_uses_primary_pass_counter_when_loaded_scene_has_prior_non_target(
    tmp_path,
):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    non_target = _single_shot_succeeded_beat(video_dir)
    target = _succeeded_beat(video_dir)
    save_scene(
        Scene(scene_id="PASS_011", beats=[non_target, target]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )
    runner = _runner(_PassCounterAwareStepRunner(video_dir, submit))

    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
        )
    )

    assert [beat.beat_id for beat in scenes[0].beats] == [
        non_target.beat_id,
        target.beat_id,
    ]
    assert _pass_take_path(video_dir, 2, pass_counter=1).exists()
    assert not _pass_take_path(video_dir, 2, pass_counter=2).exists()
    assert submit.call_count == 1


def test_new_take_rejects_non_target_empty_stored_nonempty_fresh_refs_before_dispatch_or_persist(
    tmp_path,
):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    _write_location_ref(location_id="L2")
    non_target = _single_shot_succeeded_beat(video_dir, "EP001_SH22")
    non_target_shot = dataclasses.replace(
        _shot("EP001_SH22", 4),
        location_id="L2",
    )
    non_target.beat_metadata["shot"] = dataclasses.asdict(non_target_shot)
    non_target.beat_metadata["inputs_fingerprint"] = ""
    target = _succeeded_beat(video_dir)
    scene = Scene(scene_id="PASS_011", beats=[non_target, target])
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(scene, path)
    before = path.read_text()
    runner = _runner(_FakeStepRunner(video_dir, submit))
    fresh_payload = runner._build_reroll_preflight_payload(non_target)
    fresh_refs = fresh_payload.get("reference_images") or []
    assert fresh_refs
    fresh_fp = EpisodeRunner._compute_inputs_fingerprint(fresh_refs)
    assert fresh_fp

    normal_scene = copy.deepcopy(scene)
    assert runner.revalidate_succeeded_fingerprints(normal_scene, mutate=True) == 0
    assert normal_scene.beats[0].primary_take.status == "succeeded"
    assert normal_scene.beats[0].primary_take_id == non_target.primary_take_id
    assert normal_scene.beats[0].beat_metadata["inputs_fingerprint"] == fresh_fp

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan(
                    [
                        _shot("EP001_SH23", 1),
                        _shot("EP001_SH24", 2),
                        _shot("EP001_SH25", 3),
                    ]
                ),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    submit.assert_not_called()
    assert path.read_text() == before


def test_new_take_collision_uses_primary_pass_counter_with_prior_non_target(
    tmp_path,
):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    non_target = _single_shot_succeeded_beat(video_dir)
    target = _succeeded_beat(video_dir)
    collision = _pass_take_path(video_dir, 2, pass_counter=1)
    collision.write_bytes(b"existing-reroll")
    save_scene(
        Scene(scene_id="PASS_011", beats=[non_target, target]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )
    runner = _runner(_PassCounterAwareStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan(
                    [
                        _shot("EP001_SH23", 1),
                        _shot("EP001_SH24", 2),
                        _shot("EP001_SH25", 3),
                    ]
                ),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_collision"
    assert collision.read_bytes() == b"existing-reroll"
    assert not _pass_take_path(video_dir, 2, pass_counter=2).exists()
    submit.assert_not_called()


def test_new_take_pass_identity_requires_primary_artifact_pass_token_no_provenance_rescue(
    tmp_path,
):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    primary = beat.primary_take
    assert primary is not None
    primary_video = Path(primary.workflow.steps[0].receipt.run_result.output_path)
    passless_video = video_dir / build_filename(
        episode=1,
        shot_ids=["EP001_SH23", "EP001_SH24", "EP001_SH25"],
        take=1,
    )
    passless_video.write_bytes(primary_video.read_bytes())
    primary_video.unlink()
    primary.workflow.steps[0].receipt.run_result.output_path = str(passless_video)
    primary.workflow.global_provenance["pass_counter"] = 1
    save_scene(
        Scene(scene_id="PASS_011", beats=[beat]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )
    runner = _runner(_PassCounterAwareStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan(
                    [
                        _shot("EP001_SH23", 1),
                        _shot("EP001_SH24", 2),
                        _shot("EP001_SH25", 3),
                    ]
                ),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_segment_drift"
    assert "could not derive PASS identity from the primary take artifact" in str(
        exc.value
    )
    assert not _pass_take_path(video_dir, 2, pass_counter=1).exists()
    submit.assert_not_called()


def test_cli_second_reroll_success_ignores_historical_failed_new_take(
    tmp_path,
    monkeypatch,
):
    from recoil.execution.step_types import ProjectPaths

    _write_cli_project()
    paths = ProjectPaths.for_episode("fixture", 1)
    beat = _succeeded_beat(paths.video_dir)
    original_primary = beat.primary_take_id
    save_scene(
        Scene(scene_id="PASS_011", beats=[beat]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )

    first_submit = MagicMock()
    first_runner = _FakeStepRunner(paths.video_dir, first_submit, fail=True)
    monkeypatch.setattr(generate, "StepRunner", lambda store, paths, episode=None: first_runner)
    first = generate.run_generation(
        project="fixture",
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )
    after_first = Scene.from_dict(
        json.loads(scene_path("fixture", "ep_001", "PASS_011").read_text())
    )

    assert first["success"] is False
    assert first["shots_succeeded"] == 0
    assert first["shots_failed"] == 1
    assert len(after_first.beats[0].takes) == 2
    assert after_first.beats[0].takes[-1].status == "failed"
    assert after_first.beats[0].takes[-1].take_metadata["force_new_take"] is True
    assert after_first.beats[0].primary_take_id == original_primary

    second_submit = MagicMock()
    second_runner = _FakeStepRunner(paths.video_dir, second_submit)
    monkeypatch.setattr(generate, "StepRunner", lambda store, paths, episode=None: second_runner)
    second = generate.run_generation(
        project="fixture",
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )
    after_second = Scene.from_dict(
        json.loads(scene_path("fixture", "ep_001", "PASS_011").read_text())
    )

    assert second["success"] is True
    assert second["shots_succeeded"] == 1
    assert second["shots_failed"] == 0
    assert len(after_second.beats[0].takes) == 3
    assert after_second.beats[0].takes[-1].status == "succeeded"
    assert after_second.beats[0].takes[-1].take_metadata["force_new_take"] is True
    assert _pass_take_path(paths.video_dir, 3).exists()
    assert first_submit.call_count == 1
    assert second_submit.call_count == 1


def test_run_scene_intrinsic_reroll_gate_rejects_invalid_target_before_dispatch(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    beat.beat_metadata["inputs_fingerprint"] = EpisodeRunner._compute_inputs_fingerprint(
        ["/refs/old-location.png"]
    )
    runner = _runner(_FakeStepRunner(video_dir, submit))

    assert "_reroll_preflight_token" not in inspect.signature(runner.run_scene).parameters

    with pytest.raises(RerollPreflightError):
        asyncio.run(
            runner.run_scene(
                Scene(scene_id="PASS_011", beats=[beat]),
                force_new_take=True,
                reroll_beat_id=None,
            )
        )

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_scene(
                Scene(scene_id="PASS_011", beats=[beat]),
                force_new_take=True,
                reroll_beat_id=beat.beat_id,
            )
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    submit.assert_not_called()


def test_run_scene_new_take_rejects_multiple_r2v_multi_beats_before_dispatch(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    target = _succeeded_beat(video_dir)
    extra = copy.deepcopy(target)
    extra.beat_id = "PASS_011__extra_cov"
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_scene(
                Scene(scene_id="PASS_011", beats=[target, extra]),
                force_new_take=True,
                reroll_beat_id=target.beat_id,
            )
        )

    assert exc.value.error_code == "new_take_requires_single_r2v_multi_beat"
    submit.assert_not_called()


def test_run_scene_new_take_rejects_non_target_phantom_before_dispatch(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    target = _succeeded_beat(video_dir)
    other = _succeeded_beat(video_dir)
    other.beat_id = "OTHER__cov"
    other.beat_metadata["modality"] = "video_i2v"
    other.beat_metadata.pop("batch_shots", None)
    other_video = video_dir / "OTHER_take1.mp4"
    other_video.write_bytes(b"non-target-original")
    other.primary_take.workflow.steps[0].modality = "video_i2v"
    other.primary_take.workflow.steps[0].receipt.run_result.output_path = str(other_video)
    other_video.unlink()
    scene = Scene(scene_id="PASS_011", beats=[target, other])
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_scene(
                scene,
                force_new_take=True,
                reroll_beat_id=target.beat_id,
            )
        )

    assert exc.value.error_code == "reroll_requires_succeeded_primary"
    assert [beat.beat_id for beat in scene.beats] == [target.beat_id, other.beat_id]
    submit.assert_not_called()


def test_cli_failed_reroll_does_not_create_execution_store_shots_dir(
    tmp_path,
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    (CoreProjectPaths.for_project("_fixture").project_root).mkdir(exist_ok=True)
    project = "_fixture"
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"source_shot_id": "EP001_SH23"},
                        {"source_shot_id": "EP001_SH24"},
                        {"source_shot_id": "EP001_SH25"},
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )

    beat = _succeeded_beat(paths.video_dir)
    primary_video = Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path)
    primary_video.unlink()
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), scene_path(project, "ep_001", "PASS_011"))

    shots_dir = CoreProjectPaths.for_project(project).shots_dir
    assert not shots_dir.exists()

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )
    monkeypatch.setattr(
        generate,
        "StepRunner",
        lambda store, paths, episode=None: _FakeStepRunner(paths.video_dir, MagicMock()),
    )
    monkeypatch.setattr(generate, "LearningEngine", lambda project: None)
    monkeypatch.setattr(generate, "StrategyEngine", lambda learning, model: None)

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    assert not shots_dir.exists()


def test_cli_new_take_rejects_duplicate_pass_records_before_dispatch(
    tmp_path,
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_dup"
    (CoreProjectPaths.for_project(project).project_root).mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    pass_record = {
        "pass_id": "PASS_011",
        "segments": [
            {"source_shot_id": "EP001_SH23"},
            {"source_shot_id": "EP001_SH24"},
            {"source_shot_id": "EP001_SH25"},
        ],
        "duration_s": 2.0,
        "takes_count": 1,
        "element_config": {"location_id": "L1"},
    }
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps([pass_record, dict(pass_record)]),
        encoding="utf-8",
    )
    submit = MagicMock()
    step_runner_ctor = MagicMock(
        return_value=_FakeStepRunner(paths.video_dir, submit)
    )

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "StepRunner", step_runner_ctor)

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "new_take_requires_single_pass"
    step_runner_ctor.assert_not_called()
    submit.assert_not_called()


def test_cli_normal_pass_filter_validates_unselected_blocking_pass(monkeypatch):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_normal_validation_scope"
    (CoreProjectPaths.for_project(project).project_root).mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [{"source_shot_id": "EP001_SH23"}],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                },
                {
                    "pass_id": "PASS_012",
                    "segments": [{"source_shot_id": "EP001_SH24"}],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                },
            ]
        ),
        encoding="utf-8",
    )

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )

    def validate(passes):
        pass_ids = {p.pass_id for p in passes}
        if "PASS_012" not in pass_ids:
            return []
        return [
            SimpleNamespace(
                severity=generate.Severity.BLOCK,
                pass_id="PASS_012",
                check="forced_block",
                message="forced unselected block",
            )
        ]

    monkeypatch.setattr(generate, "validate_all_passes", validate)
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        dry_run=True,
        force_new_take=False,
    )

    assert result["success"] is False
    assert result["error"] == "validation_blocked"
    assert result["blocks"] == [
        {
            "pass_id": "PASS_012",
            "check": "forced_block",
            "message": "forced unselected block",
        }
    ]


def test_cli_new_take_pass_filter_ignores_unselected_blocking_pass(monkeypatch):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_new_take_validation_scope"
    (CoreProjectPaths.for_project(project).project_root).mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"source_shot_id": "EP001_SH23"},
                        {"source_shot_id": "EP001_SH24"},
                        {"source_shot_id": "EP001_SH25"},
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                },
                {
                    "pass_id": "PASS_012",
                    "segments": [{"source_shot_id": "EP001_SH26"}],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                },
            ]
        ),
        encoding="utf-8",
    )
    beat = _succeeded_beat(paths.video_dir)
    save_scene(
        Scene(scene_id="PASS_011", beats=[beat]),
        scene_path(project, "ep_001", "PASS_011"),
    )
    validated_pass_ids = []

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )

    def validate(passes):
        ids = [p.pass_id for p in passes]
        validated_pass_ids.append(ids)
        if "PASS_012" not in ids:
            return []
        return [
            SimpleNamespace(
                severity=generate.Severity.BLOCK,
                pass_id="PASS_012",
                check="forced_block",
                message="forced unselected block",
            )
        ]

    monkeypatch.setattr(generate, "validate_all_passes", validate)
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        dry_run=True,
        force_new_take=True,
    )

    assert result["success"] is True
    assert result["dry_run"] is True
    assert validated_pass_ids == [["PASS_011"]]


def test_cli_failed_reroll_preflight_does_not_create_missing_video_dir(
    tmp_path,
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_no_video_dir"
    (CoreProjectPaths.for_project(project).project_root).mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"source_shot_id": "EP001_SH23"},
                        {"source_shot_id": "EP001_SH24"},
                        {"source_shot_id": "EP001_SH25"},
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )

    beat = _succeeded_beat(paths.video_dir)
    primary_video = Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path)
    primary_video.unlink()
    paths.video_dir.rmdir()
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), scene_path(project, "ep_001", "PASS_011"))
    assert not paths.video_dir.exists()
    submit = MagicMock()

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )
    monkeypatch.setattr(
        generate,
        "StepRunner",
        lambda store, paths, episode=None: _FakeStepRunner(paths.video_dir, submit),
    )
    monkeypatch.setattr(generate, "LearningEngine", lambda project: None)
    monkeypatch.setattr(generate, "StrategyEngine", lambda learning, model: None)

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    assert not paths.video_dir.exists()
    submit.assert_not_called()


def test_cli_failed_reroll_preflight_does_not_create_learning_ops_or_output_dirs(
    tmp_path,
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_no_preflight_side_effects"
    core_paths = CoreProjectPaths.for_project(project)
    core_paths.project_root.mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    paths.plans_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"source_shot_id": "EP001_SH23"},
                        {"source_shot_id": "EP001_SH24"},
                        {"source_shot_id": "EP001_SH25"},
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )

    beat = _succeeded_beat(paths.video_dir)
    primary_video = Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path)
    primary_video.unlink()
    paths.video_dir.rmdir()
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), scene_path(project, "ep_001", "PASS_011"))
    submit = MagicMock()
    ops_log_path = tmp_path / "_dispatch_logs" / "ops.log.jsonl"
    monkeypatch.setenv("RECOIL_OPS_LOG_PATH", str(ops_log_path))

    assert not core_paths.learning_dir.exists()
    assert not core_paths.shots_dir.exists()
    assert not paths.video_dir.exists()
    assert not ops_log_path.exists()

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )
    monkeypatch.setattr(
        generate,
        "StepRunner",
        lambda store, paths, episode=None: _FakeStepRunner(paths.video_dir, submit),
    )

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    assert not core_paths.learning_dir.exists()
    assert not core_paths.shots_dir.exists()
    assert not paths.video_dir.exists()
    assert not ops_log_path.exists()
    submit.assert_not_called()


def test_new_take_rejects_phantom_succeeded_primary_before_dispatch_or_mutation(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    primary_video = Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path)
    primary_video.unlink()
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), path)
    before = path.read_text()
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_requires_succeeded_primary"
    submit.assert_not_called()
    assert path.read_text() == before


def test_new_take_rejects_segment_drift_before_dispatch_or_mutation(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), path)
    before = path.read_text()
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([
                    _shot("EP001_SH23", 1),
                    _shot("EP001_SH24", 2),
                    _shot("EP001_SH26", 3),
                ]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_segment_drift"
    submit.assert_not_called()
    assert path.read_text() == before


def test_new_take_rejects_segment_drift_from_primary_artifact_after_metadata_refresh(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    refreshed_shots = [
        _shot("EP001_SH23", 1),
        _shot("EP001_SH24", 2),
        _shot("EP001_SH26", 3),
    ]
    beat.beat_metadata["segment_shot_ids"] = [shot.shot_id for shot in refreshed_shots]
    beat.beat_metadata["batch_shots"] = [
        dataclasses.asdict(shot) for shot in refreshed_shots
    ]
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), path)
    before = path.read_text()
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan(refreshed_shots),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_segment_drift"
    submit.assert_not_called()
    assert path.read_text() == before


def test_new_take_rejects_non_target_phantom_without_persisting(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    target = _succeeded_beat(video_dir)
    other = _succeeded_beat(video_dir)
    other.beat_id = "OTHER__cov"
    other.beat_metadata["modality"] = "video_i2v"
    other.beat_metadata.pop("batch_shots", None)
    other_video = video_dir / "OTHER_take1.mp4"
    other_video.write_bytes(b"non-target-original")
    other.primary_take.workflow.steps[0].modality = "video_i2v"
    other.primary_take.workflow.steps[0].receipt.run_result.output_path = str(other_video)
    other_video.unlink()
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(Scene(scene_id="PASS_011", beats=[target, other]), path)
    before = path.read_text()
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_requires_succeeded_primary"
    submit.assert_not_called()
    assert path.read_text() == before


def test_new_take_rejects_non_target_without_shot_metadata_before_dispatch_or_persist(
    tmp_path,
):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    target = _succeeded_beat(video_dir)
    other = _single_shot_succeeded_beat(video_dir, "EP001_SH22")
    other.beat_metadata.pop("shot", None)
    scene = Scene(scene_id="PASS_011", beats=[target, other])
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(scene, path)
    before = path.read_text()
    runner = _runner(_FakeStepRunner(video_dir, submit))

    assert runner.revalidate_succeeded_fingerprints(
        Scene(scene_id="PASS_011", beats=[other]),
        mutate=True,
    ) == 0

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_scene(
                scene,
                force_new_take=True,
                reroll_beat_id=target.beat_id,
            )
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    assert "Could not verify non-target succeeded beat integrity" in str(exc.value)
    submit.assert_not_called()
    assert len(target.takes) == 1
    assert other.primary_take.status == "succeeded"
    assert path.read_text() == before


def test_new_take_rejects_non_target_payload_build_error_before_dispatch_or_persist(
    tmp_path,
    monkeypatch,
):
    from recoil.pipeline._lib import dispatch_payload

    submit = MagicMock()
    video_dir = tmp_path / "video"
    target = _succeeded_beat(video_dir)
    other = _single_shot_succeeded_beat(video_dir, "EP001_SH22")
    scene = Scene(scene_id="PASS_011", beats=[target, other])
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(scene, path)
    before = path.read_text()
    runner = _runner(_FakeStepRunner(video_dir, submit))
    original_build_dispatch_payload = dispatch_payload.build_dispatch_payload

    def fail_for_non_target(*args, **kwargs):
        shot = kwargs.get("shot")
        if getattr(shot, "shot_id", None) == "EP001_SH22":
            raise ValueError("unresolvable non-target refs")
        return original_build_dispatch_payload(*args, **kwargs)

    monkeypatch.setattr(
        dispatch_payload,
        "build_dispatch_payload",
        fail_for_non_target,
    )

    assert runner.revalidate_succeeded_fingerprints(
        Scene(scene_id="PASS_011", beats=[other]),
        mutate=True,
    ) == 0

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_scene(
                scene,
                force_new_take=True,
                reroll_beat_id=target.beat_id,
            )
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    assert "Could not verify non-target succeeded beat integrity" in str(exc.value)
    submit.assert_not_called()
    assert len(target.takes) == 1
    assert other.primary_take.status == "succeeded"
    assert path.read_text() == before


def test_episode_runner_constructor_persists_no_scene_json_for_normal_plan():
    plan = {"sequences": {"PASS_011": {"shots": [{"shot_id": "EP001_SH23"}]}}}

    runner = EpisodeRunner(
        project="fixture",
        plan=plan,
        max_takes=5,
        budget_usd=50.0,
        concurrency=1,
        episode="ep_001",
        step_runner=MagicMock(),
    )

    assert runner.plan == plan
    assert not scene_path("fixture", "ep_001", "PASS_011").exists()


def test_direct_run_scene_new_take_invalid_target_persists_no_constructor_scene_json(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    plan = {"sequences": {"PASS_011": {"shots": [{"shot_id": "EP001_SH23"}]}}}
    runner = EpisodeRunner(
        project="fixture",
        plan=plan,
        max_takes=5,
        budget_usd=50.0,
        concurrency=1,
        episode="ep_001",
        step_runner=_FakeStepRunner(video_dir, submit),
    )
    path = scene_path("fixture", "ep_001", "PASS_011")
    assert not path.exists()

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_scene(
                Scene(scene_id="PASS_011", beats=[beat]),
                force_new_take=True,
                reroll_beat_id=None,
            )
        )

    assert exc.value.error_code == "reroll_preflight_required"
    assert not path.exists()
    submit.assert_not_called()


def test_direct_run_scene_new_take_rejects_segment_drift_before_dispatch(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    drifted_shots = [
        _shot("EP001_SH23", 1),
        _shot("EP001_SH24", 2),
        _shot("EP001_SH26", 3),
    ]
    beat.beat_metadata["shot"] = dataclasses.asdict(drifted_shots[0])
    beat.beat_metadata["batch_shots"] = [
        dataclasses.asdict(shot) for shot in drifted_shots
    ]
    scene = Scene(scene_id="PASS_011", beats=[beat])
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_scene(
                scene,
                force_new_take=True,
                reroll_beat_id=beat.beat_id,
            )
        )

    assert exc.value.error_code == "reroll_segment_drift"
    submit.assert_not_called()


def test_direct_run_scene_new_take_dry_run_is_read_only_for_valid_reroll(
    tmp_path,
    monkeypatch,
):
    from recoil.pipeline._lib import dispatch_payload

    submit = MagicMock()
    video_dir = tmp_path / "video"
    _write_character_ref()
    _write_location_ref()
    beat = _character_succeeded_beat(video_dir)
    runner = _runner(_FakeStepRunner(video_dir, submit))
    payload = runner._build_reroll_preflight_payload(beat)
    beat.beat_metadata["inputs_fingerprint"] = EpisodeRunner._compute_inputs_fingerprint(
        payload.get("reference_images") or []
    )
    scene = Scene(scene_id="PASS_011", beats=[beat])
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(scene, path)
    before_json = path.read_text(encoding="utf-8")
    before_take_count = len(beat.takes)

    author_pass = MagicMock(side_effect=AssertionError("author_pass must not run"))
    opus = MagicMock(side_effect=AssertionError("call_opus_oauth must not run"))
    monkeypatch.setattr(dispatch_payload, "author_pass", author_pass)
    monkeypatch.setattr(dispatch_payload, "call_opus_oauth", opus)

    returned = asyncio.run(
        runner.run_scene(
            scene,
            dry_run=True,
            force_new_take=True,
            reroll_beat_id=beat.beat_id,
        )
    )

    assert returned is scene
    assert len(beat.takes) == before_take_count
    assert path.read_text(encoding="utf-8") == before_json
    submit.assert_not_called()
    author_pass.assert_not_called()
    opus.assert_not_called()


def test_cli_new_take_dry_run_missing_primary_returns_reroll_error_without_dispatch_or_persist(
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_dry_run_missing_primary"
    (CoreProjectPaths.for_project(project).project_root).mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"source_shot_id": "EP001_SH23"},
                        {"source_shot_id": "EP001_SH24"},
                        {"source_shot_id": "EP001_SH25"},
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )
    step_runner_ctor = MagicMock()
    execution_store_ctor = MagicMock()

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )
    monkeypatch.setattr(generate, "StepRunner", step_runner_ctor)
    monkeypatch.setattr(generate, "ExecutionStore", execution_store_ctor)

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        dry_run=True,
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    assert not scene_path(project, "ep_001", "PASS_011").exists()
    assert not paths.video_dir.exists()
    step_runner_ctor.assert_not_called()
    execution_store_ctor.assert_not_called()


def test_cli_new_take_dry_run_zero_resolved_shots_returns_reroll_error_without_dispatch(
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_dry_run_zero_resolved_shots"
    CoreProjectPaths.for_project(project).project_root.mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [{"source_shot_id": "EP001_SH99"}],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )
    step_runner_ctor = MagicMock()
    execution_store_ctor = MagicMock()

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan([_shot("EP001_SH23", 1)]),
    )
    monkeypatch.setattr(generate, "StepRunner", step_runner_ctor)
    monkeypatch.setattr(generate, "ExecutionStore", execution_store_ctor)

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        dry_run=True,
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    assert "could not resolve any shots" in result["message"]
    assert not scene_path(project, "ep_001", "PASS_011").exists()
    assert not paths.video_dir.exists()
    step_runner_ctor.assert_not_called()
    execution_store_ctor.assert_not_called()


def test_cli_new_take_live_zero_resolved_shots_cleans_transient_passes_dir(
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_live_zero_resolved_shots"
    CoreProjectPaths.for_project(project).project_root.mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [{"source_shot_id": "EP001_SH99"}],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )
    lock_parent = CoreProjectPaths.from_root(paths.project_root).passes_dir
    submit = MagicMock()
    step_runner = _FakeStepRunner(paths.video_dir, submit)

    assert not lock_parent.exists()

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan([_shot("EP001_SH23", 1)]),
    )
    monkeypatch.setattr(
        generate,
        "ExecutionStore",
        lambda project, migrate=True: SimpleNamespace(),
    )
    monkeypatch.setattr(generate, "StepRunner", lambda store, paths, episode=None: step_runner)
    monkeypatch.setattr(generate, "LearningEngine", lambda project: None)
    monkeypatch.setattr(generate, "StrategyEngine", lambda learning, model: None)

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    assert "could not resolve any shots" in result["message"]
    submit.assert_not_called()
    assert not scene_path(project, "ep_001", "PASS_011").exists()
    assert not paths.video_dir.exists()
    assert not lock_parent.exists()


def test_cli_new_take_empty_pass_to_shot_mapping_returns_reroll_error_before_runner(
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_empty_pass_mapping"
    CoreProjectPaths.for_project(project).project_root.mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"segment_index": 0, "duration_s": 2.0, "prompt": "unmapped"}
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )
    runner_ctor = MagicMock(side_effect=AssertionError("EpisodeRunner must not run"))
    step_runner_ctor = MagicMock()
    execution_store_ctor = MagicMock()

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )
    monkeypatch.setattr(generate, "EpisodeRunner", runner_ctor)
    monkeypatch.setattr(generate, "StepRunner", step_runner_ctor)
    monkeypatch.setattr(generate, "ExecutionStore", execution_store_ctor)

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        dry_run=True,
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    assert "could not resolve any shots" in result["message"]
    assert not scene_path(project, "ep_001", "PASS_011").exists()
    runner_ctor.assert_not_called()
    step_runner_ctor.assert_not_called()
    execution_store_ctor.assert_not_called()


def test_cli_new_take_dry_run_valid_reroll_is_read_only_and_writes_no_ops_log(
    tmp_path,
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_dry_run_valid_no_ops"
    (CoreProjectPaths.for_project(project).project_root).mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"source_shot_id": "EP001_SH23"},
                        {"source_shot_id": "EP001_SH24"},
                        {"source_shot_id": "EP001_SH25"},
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )

    beat = _succeeded_beat(paths.video_dir)
    scene_disk_path = scene_path(project, "ep_001", "PASS_011")
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), scene_disk_path)
    before_scene_json = scene_disk_path.read_text(encoding="utf-8")
    before_videos = sorted(
        p.relative_to(paths.video_dir).as_posix()
        for p in paths.video_dir.rglob("*")
        if p.is_file()
    )
    ops_log_path = tmp_path / "_dispatch_logs" / "ops.log.jsonl"
    monkeypatch.setenv("RECOIL_OPS_LOG_PATH", str(ops_log_path))
    dispatch = MagicMock(side_effect=AssertionError("_dispatch_one_beat must not run"))

    assert not ops_log_path.exists()

    monkeypatch.setattr(EpisodeRunner, "_dispatch_one_beat", dispatch)
    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )
    monkeypatch.setattr(
        generate,
        "StepRunner",
        MagicMock(side_effect=AssertionError("StepRunner must not run")),
    )
    monkeypatch.setattr(
        generate,
        "ExecutionStore",
        MagicMock(side_effect=AssertionError("ExecutionStore must not run")),
    )

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        dry_run=True,
        force_new_take=True,
    )

    assert result["success"] is True
    assert result["dry_run"] is True
    assert not ops_log_path.exists()
    assert scene_disk_path.read_text(encoding="utf-8") == before_scene_json
    assert sorted(
        p.relative_to(paths.video_dir).as_posix()
        for p in paths.video_dir.rglob("*")
        if p.is_file()
    ) == before_videos
    dispatch.assert_not_called()


def test_cli_new_take_dry_run_non_target_phantom_returns_reroll_error_without_dispatch(
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_dry_run_non_target_phantom"
    (CoreProjectPaths.for_project(project).project_root).mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"source_shot_id": "EP001_SH23"},
                        {"source_shot_id": "EP001_SH24"},
                        {"source_shot_id": "EP001_SH25"},
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )

    target = _succeeded_beat(paths.video_dir)
    non_target = _succeeded_beat(paths.video_dir / "non_target")
    non_target.beat_id = "PASS_011__non_target_i2v"
    non_target.beat_metadata["modality"] = "video_i2v"
    missing_artifact = Path(
        non_target.takes[0].workflow.steps[0].receipt.run_result.output_path
    )
    missing_artifact.unlink()
    save_scene(
        Scene(scene_id="PASS_011", beats=[target, non_target]),
        scene_path(project, "ep_001", "PASS_011"),
    )

    dispatch = MagicMock(side_effect=AssertionError("_dispatch_one_beat must not run"))
    monkeypatch.setattr(EpisodeRunner, "_dispatch_one_beat", dispatch)
    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        dry_run=True,
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    dispatch.assert_not_called()


def test_cli_live_new_take_failed_preflight_removes_transient_passes_dir(
    monkeypatch,
):
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.execution.step_types import ProjectPaths

    project = "_fixture_live_preflight_transient_passes_dir"
    core_paths = CoreProjectPaths.for_project(project)
    core_paths.project_root.mkdir(exist_ok=True)
    paths = ProjectPaths.for_episode(project, 1)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "segments": [
                        {"source_shot_id": "EP001_SH23"},
                        {"source_shot_id": "EP001_SH24"},
                        {"source_shot_id": "EP001_SH25"},
                    ],
                    "duration_s": 2.0,
                    "takes_count": 1,
                    "element_config": {"location_id": "L1"},
                }
            ]
        ),
        encoding="utf-8",
    )
    beat = _succeeded_beat(paths.video_dir)
    primary_video = Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path)
    primary_video.unlink()
    save_scene(
        Scene(scene_id="PASS_011", beats=[beat]),
        scene_path(project, "ep_001", "PASS_011"),
    )
    passes_dir = core_paths.passes_dir
    assert not passes_dir.exists()
    submit = MagicMock()

    monkeypatch.setattr(
        generate.CoveragePass,
        "from_dict",
        staticmethod(lambda d: SimpleNamespace(pass_id=d["pass_id"])),
    )
    monkeypatch.setattr(generate, "validate_all_passes", lambda passes: [])
    monkeypatch.setattr(generate, "get_provider_cost_per_second", lambda model: 0.0)
    monkeypatch.setattr(
        generate,
        "load_plan",
        lambda path: _plan(
            [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]
        ),
    )
    monkeypatch.setattr(
        generate,
        "StepRunner",
        lambda store, paths, episode=None: _FakeStepRunner(paths.video_dir, submit),
    )

    result = generate.run_generation(
        project=project,
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )

    assert result["success"] is False
    assert result["error"] == "reroll_requires_succeeded_primary"
    assert not passes_dir.exists()
    submit.assert_not_called()


def test_new_take_rejects_single_pass_fanout_before_submit(tmp_path):
    submit = MagicMock()
    runner = _runner(_FakeStepRunner(tmp_path / "video", submit))
    shots = [_shot(f"EP001_SH{i:02d}", i) for i in range(1, 8)]

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan(shots),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "new_take_requires_single_r2v_multi_beat"
    submit.assert_not_called()


def test_cli_seed_without_new_take_returns_structured_error_without_selector(
    monkeypatch,
    capsys,
):
    monkeypatch.setattr(
        sys,
        "argv",
        [
            "generate.py",
            "--project",
            "fixture",
            "--episode",
            "1",
            "--seed",
            "123",
        ],
    )

    exit_code = generate.main()
    result = json.loads(capsys.readouterr().out)

    assert exit_code == generate.EXIT_VALIDATION
    assert result["success"] is False
    assert result["error"] == "flag_requires_new_take"


def test_ref_drift_preflight_rejects_nonempty_stored_different_nonempty_fresh_before_mutation(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    ref = _write_location_ref()
    runner = _runner(_FakeStepRunner(video_dir, submit))
    payload = runner._build_reroll_preflight_payload(beat)
    fresh_refs = payload.get("reference_images") or []
    assert str(ref) in fresh_refs
    fresh_fp = EpisodeRunner._compute_inputs_fingerprint(fresh_refs)
    stored_fp = EpisodeRunner._compute_inputs_fingerprint(["/refs/old-location.png"])
    assert stored_fp != fresh_fp
    beat.beat_metadata["inputs_fingerprint"] = stored_fp
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), path)
    before = path.read_text()

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    submit.assert_not_called()
    assert path.read_text() == before


def test_ref_drift_preflight_rejects_nonempty_stored_empty_fresh_before_mutation(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    runner = _runner(_FakeStepRunner(video_dir, submit))
    payload = runner._build_reroll_preflight_payload(beat)
    fresh_refs = payload.get("reference_images") or []
    assert EpisodeRunner._compute_inputs_fingerprint(fresh_refs) == ""
    beat.beat_metadata["inputs_fingerprint"] = EpisodeRunner._compute_inputs_fingerprint(
        ["/refs/removed-location.png"]
    )
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), path)
    before = path.read_text()

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    submit.assert_not_called()
    assert path.read_text() == before


def test_character_reroll_dry_run_preflight_resolves_refs_without_author_call(
    tmp_path,
    monkeypatch,
):
    from recoil.pipeline._lib import dispatch_payload

    submit = MagicMock()
    video_dir = tmp_path / "video"
    _write_character_ref()
    _write_location_ref()
    shots = _character_shots()
    _assert_directed_prose_character_fixture(shots)
    beat = _character_succeeded_beat(video_dir)
    runner = _runner(_FakeStepRunner(video_dir, submit))

    author_pass = MagicMock(side_effect=AssertionError("author_pass must not run"))
    opus = MagicMock(side_effect=AssertionError("call_opus_oauth must not run"))
    monkeypatch.setattr(dispatch_payload, "author_pass", author_pass)
    monkeypatch.setattr(dispatch_payload, "call_opus_oauth", opus)

    payload = runner._build_reroll_preflight_payload(beat)
    fresh_refs = payload.get("reference_images") or []
    assert fresh_refs
    assert any("jade_identity_hero_v01.png" in ref for ref in fresh_refs)
    beat.beat_metadata["inputs_fingerprint"] = EpisodeRunner._compute_inputs_fingerprint(
        fresh_refs
    )
    save_scene(
        Scene(scene_id="PASS_011", beats=[beat]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )

    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan(shots),
            dry_run=True,
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
        )
    )

    assert len(scenes) == 1
    submit.assert_not_called()
    author_pass.assert_not_called()
    opus.assert_not_called()


def test_character_reroll_drift_preflight_rejects_without_author_call(
    tmp_path,
    monkeypatch,
):
    from recoil.pipeline._lib import dispatch_payload

    submit = MagicMock()
    video_dir = tmp_path / "video"
    _write_character_ref()
    _write_location_ref()
    shots = _character_shots()
    _assert_directed_prose_character_fixture(shots)
    beat = _character_succeeded_beat(video_dir)
    beat.beat_metadata["inputs_fingerprint"] = EpisodeRunner._compute_inputs_fingerprint(
        ["/refs/old-character.png"]
    )
    path = scene_path("fixture", "ep_001", "PASS_011")
    save_scene(Scene(scene_id="PASS_011", beats=[beat]), path)
    before = path.read_text()
    runner = _runner(_FakeStepRunner(video_dir, submit))

    author_pass = MagicMock(side_effect=AssertionError("author_pass must not run"))
    opus = MagicMock(side_effect=AssertionError("call_opus_oauth must not run"))
    monkeypatch.setattr(dispatch_payload, "author_pass", author_pass)
    monkeypatch.setattr(dispatch_payload, "call_opus_oauth", opus)

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan(shots),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    submit.assert_not_called()
    author_pass.assert_not_called()
    opus.assert_not_called()
    assert path.read_text() == before


def test_character_reroll_non_target_revalidation_skips_author_call(
    tmp_path,
    monkeypatch,
):
    from recoil.pipeline._lib import dispatch_payload

    video_dir = tmp_path / "video"
    _write_character_ref()
    _write_location_ref()
    shots = _character_shots()
    _assert_directed_prose_character_fixture(shots)
    beat = _character_succeeded_beat(video_dir)
    beat.beat_id = "PASS_011__non_target_cov"
    runner = _runner(_FakeStepRunner(video_dir, MagicMock()))

    skip_author_payload = runner._build_reroll_preflight_payload(beat)
    skip_author_refs = skip_author_payload.get("reference_images") or []
    assert skip_author_refs

    fake_author = MagicMock(return_value=_authored_directed_prose())
    monkeypatch.setattr(dispatch_payload, "author_pass", fake_author)
    authored_payload = dispatch_payload.build_dispatch_payload(
        shot=shots[0],
        project="fixture",
        modality="r2v_multi",
        episode="ep_001",
        batch_shots=shots,
        dry_run=True,
    )
    assert authored_payload.get("reference_images") == skip_author_refs
    fake_author.assert_called()

    beat.beat_metadata["inputs_fingerprint"] = EpisodeRunner._compute_inputs_fingerprint(
        ["/refs/old-character.png"]
    )
    author_pass = MagicMock(side_effect=AssertionError("author_pass must not run"))
    opus = MagicMock(side_effect=AssertionError("call_opus_oauth must not run"))
    monkeypatch.setattr(dispatch_payload, "author_pass", author_pass)
    monkeypatch.setattr(dispatch_payload, "call_opus_oauth", opus)

    # This is the exact read-only non-target check run by the reroll preflight
    # after the target beat is temporarily removed from the scene.
    with pytest.raises(RerollPreflightError) as exc:
        runner.revalidate_succeeded_fingerprints(
            Scene(scene_id="PASS_011", beats=[beat]),
            mutate=False,
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    author_pass.assert_not_called()
    opus.assert_not_called()


def test_fresh_full_coverage_pass_id_uses_pass_ordinal(tmp_path):
    # Regression guard for the 2026-06-08 pass_counter fix: a FRESH dispatch of a
    # real coverage-pass id (EP001_PASS_011_...) must name the take with the real
    # ordinal (PASS_011), not the one-beat-scene index collapse (PASS_001).
    from recoil.pipeline.core.dispatch import register_default_runners

    submit = MagicMock()
    video_dir = tmp_path / "video"
    sr = _PassCounterAwareStepRunner(video_dir, submit)
    runner = _runner(sr)
    # Fresh dispatch (no force_new_take) does not re-register runners, so wire
    # THIS test's fake explicitly with force=True — overriding any global runner
    # registry left by a prior reroll test (which registers with force=True).
    register_default_runners(sr, force=True)
    pass_id = "EP001_PASS_011_SH23_24_25_N_ENV"

    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id=pass_id,
        )
    )

    beat = scenes[0].beats[0]
    assert beat.beat_id == f"{pass_id}__cov"
    assert beat.takes[0].workflow.steps[0].payload["pass_counter"] == 11
    assert _pass_take_path(video_dir, 1, pass_counter=11).exists()
    assert not _pass_take_path(video_dir, 1, pass_counter=1).exists()
    assert submit.call_count == 1
