import asyncio
from pathlib import Path

import pytest

from recoil.pipeline._lib.plan_loader import CanonicalPlan, CanonicalShot
from recoil.pipeline.core.persistence import (
    load_manifest,
    load_scene,
    save_scene,
    scene_path,
    scene_version_path,
)
from recoil.pipeline.core.scene_version_store import SceneVersionStore
from recoil.pipeline.core.take import Beat, Scene
from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner


def _shot(
    batch_index: int,
    shot_index: int,
    *,
    shots_per_batch: int = 3,
) -> CanonicalShot:
    ordinal = (batch_index - 1) * shots_per_batch + shot_index
    return CanonicalShot(
        shot_id=f"EP001_SH{ordinal:02d}",
        scene_index=batch_index,
        sequence_id=None,
        pipeline="video",
        previs_model=None,
        video_model="seeddance-2.0",
        location_id=f"LOC_{batch_index}",
        characters=[],
        shot_type="MS",
        duration_s=2.0,
        is_env_only=False,
        has_dialogue=False,
        aspect_ratio="9:16",
        quality=None,
        cinematography=None,
        raw={},
    )


def _canonical_plan() -> CanonicalPlan:
    shots = [
        _shot(batch_index, shot_index)
        for batch_index in range(1, 5)
        for shot_index in range(1, 4)
    ]
    return CanonicalPlan(
        episode_id="ep_001",
        project="fixture",
        shots=shots,
        source_path=Path("fixture_plan.json"),
        raw={
            "episode_id": "ep_001",
            "project": "fixture",
            "shots": [{"shot_id": shot.shot_id} for shot in shots],
        },
    )


def _grown_opening_plan() -> CanonicalPlan:
    shots = [
        _shot(batch_index, shot_index, shots_per_batch=4)
        for batch_index in range(1, 5)
        for shot_index in range(1, 5)
    ]
    return CanonicalPlan(
        episode_id="ep_001",
        project="fixture",
        shots=shots,
        source_path=Path("fixture_grown_plan.json"),
        raw={
            "episode_id": "ep_001",
            "project": "fixture",
            "shots": [{"shot_id": shot.shot_id} for shot in shots],
        },
    )


def _stale_scene(
    scene_id: str,
    *,
    locked: bool = False,
    board_artifact: str | None = None,
) -> Scene:
    beat = Beat(
        beat_id=f"OLD_{scene_id}",
        beat_metadata={"marker": "stale"},
        board=(
            {
                "status": "approved",
                "artifact": board_artifact,
                "source_sha256": "old-board",
                "approved_by": "JT",
                "updated_at": "2026-06-13T00:00:00Z",
            }
            if board_artifact
            else None
        ),
    )
    return Scene(
        scene_id=scene_id,
        beats=[beat],
        scene_metadata={"marker": "stale"},
        locked=locked,
        lock_reason="staged" if locked else None,
    )


def _plant_stale_scenes() -> dict[str, bytes]:
    original: dict[str, bytes] = {}
    for index in range(1, 5):
        scene_id = f"BATCH_{index:03d}"
        path = scene_path("fixture", "ep_001", scene_id)
        save_scene(_stale_scene(scene_id, locked=scene_id == "BATCH_004"), path)
        original[scene_id] = path.read_bytes()
    return original


async def _noop_run_scene(self, scene, **kwargs):  # noqa: ANN001, ANN003
    return scene


def _runner(plan: CanonicalPlan) -> EpisodeRunner:
    return EpisodeRunner(
        project="fixture",
        plan=plan.raw,
        casting={},
        episode="ep_001",
        concurrency=1,
    )


def _configure_projects_root(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
    (tmp_path / ".recoil-data-root").write_text("recoil-data-root\n", encoding="utf-8")
    (tmp_path / "fixture").mkdir()
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    monkeypatch.setenv("RECOIL_BOARD_GATE", "0")


def test_breakdown_regen_treats_locked_scene_as_inert(
    tmp_path: Path,
    monkeypatch: pytest.MonkeyPatch,
    caplog: pytest.LogCaptureFixture,
    capsys: pytest.CaptureFixture[str],
) -> None:
    """REC-231 Phase 7: ``scene.locked`` is inert — the breakdown regen no longer SKIPS a
    locked scene (the lock-as-clobber-guard + the ``--force-scene-overwrite`` / ``.bak``
    overwrite path are DELETED). BATCH_004 stages a candidate exactly like the
    unlocked batches; nothing is reported skipped."""
    _configure_projects_root(tmp_path, monkeypatch)
    monkeypatch.setattr(EpisodeRunner, "run_scene", _noop_run_scene)
    original = _plant_stale_scenes()
    plan = _canonical_plan()

    caplog.set_level("WARNING")
    scenes = asyncio.run(_runner(plan).run_episode_batches(plan))

    assert scenes == []
    for index in range(1, 5):
        scene_id = f"BATCH_{index:03d}"
        path = scene_path("fixture", "ep_001", scene_id)
        assert path.read_bytes() == original[scene_id]
        manifest = load_manifest("fixture", "ep_001", scene_id)
        assert manifest is not None
        assert manifest["active_version"] == 1
        assert [entry["version"] for entry in manifest["versions"]] == [1, 2]
        assert manifest["versions"][1]["downstream"] == "not_derived"
        candidate = load_scene(scene_version_path("fixture", "ep_001", scene_id, 2))
        assert candidate.scene_id == scene_id
        assert candidate.beats[0].beat_id == scene_id
        # The (formerly locked) BATCH_004 is re-derived like any other — lock is inert.
        assert candidate.locked is False

    assert "SKIP BATCH_004" not in caplog.text
    assert (
        "scenes: wrote 0 (scenes actually persisted by the guarded writer this run), "
        "SKIPPED 0 locked ()"
    ) in capsys.readouterr().out


def test_opening_grows_no_renumber_under_inert_lock(
    tmp_path: Path,
    monkeypatch: pytest.MonkeyPatch,
    capsys: pytest.CaptureFixture[str],
) -> None:
    """REC-231 Phase 7: a grown opening stages the refreshed topology as a candidate.
    It keeps its ``scene_id`` and is NEVER renumbered to BATCH_005. The fixed thing is
    the batch IDENTITY (Phase 6 identity-halt), never the lock; with the lock inert the
    scene is re-derived like any other batch."""
    _configure_projects_root(tmp_path, monkeypatch)
    monkeypatch.setattr(EpisodeRunner, "run_scene", _noop_run_scene)

    for index in range(1, 5):
        scene_id = f"BATCH_{index:03d}"
        path = scene_path("fixture", "ep_001", scene_id)
        save_scene(_stale_scene(scene_id, locked=scene_id == "BATCH_004"), path)

    plan = _grown_opening_plan()
    scenes = asyncio.run(_runner(plan).run_episode_batches(plan))

    # No renumber: the four batch identities are stable; no BATCH_005 is created.
    assert scenes == []
    assert not scene_path("fixture", "ep_001", "BATCH_005").exists()
    for index in range(1, 5):
        scene_id = f"BATCH_{index:03d}"
        restored = load_scene(scene_path("fixture", "ep_001", scene_id))
        assert restored.beats[0].beat_metadata["marker"] == "stale"
        manifest = load_manifest("fixture", "ep_001", scene_id)
        assert manifest is not None
        assert manifest["active_version"] == 1
        candidate = load_scene(scene_version_path("fixture", "ep_001", scene_id, 2))
        assert candidate.scene_id == scene_id
        assert candidate.beats[0].beat_id == scene_id
        assert len(candidate.beats[0].beat_metadata["batch_shots"]) == 4
    assert "SKIPPED 0 locked ()" in capsys.readouterr().out


def test_registered_body_missing_structural_metadata_stages_candidate(
    tmp_path: Path,
    monkeypatch: pytest.MonkeyPatch,
    capsys: pytest.CaptureFixture[str],
) -> None:
    _configure_projects_root(tmp_path, monkeypatch)

    async def _fail_if_dispatched(self, scene, **kwargs):  # noqa: ANN001, ANN003
        raise AssertionError("registered incomplete body must stage a candidate, not dispatch")

    monkeypatch.setattr(EpisodeRunner, "run_scene", _fail_if_dispatched)
    minimal = Scene(
        scene_id="BATCH_001",
        beats=[Beat("BATCH_001", beat_metadata={"modality": "r2v_multi"})],
        scene_metadata={"marker": "registered-minimal"},
    )
    flat = scene_path("fixture", "ep_001", "BATCH_001")
    save_scene(minimal, flat)
    original = flat.read_bytes()
    store = SceneVersionStore("fixture", "ep_001")
    store.write_scene_candidate(
        "BATCH_001",
        Scene(
            scene_id="BATCH_001",
            beats=[
                Beat(
                    "BATCH_001",
                    beat_metadata={
                        "modality": "r2v_multi",
                        "batch_summary": {"old_candidate": True},
                    },
                )
            ],
        ),
    )

    scenes = asyncio.run(
        _runner(_canonical_plan()).run_episode_batches(
            _canonical_plan(),
            only_scene_ids={"BATCH_001"},
        )
    )

    assert scenes == []
    assert flat.read_bytes() == original
    manifest = load_manifest("fixture", "ep_001", "BATCH_001")
    assert manifest is not None
    assert manifest["active_version"] == 1
    assert [entry["version"] for entry in manifest["versions"]] == [1, 2, 3]
    candidate = load_scene(scene_version_path("fixture", "ep_001", "BATCH_001", 3))
    assert candidate.beats[0].beat_metadata["batch_shots"]
    assert "registered-version-metadata: appended candidate v3" in capsys.readouterr().out
