"""CP-10 Beat persistence — round-trip + atomic-write + idempotent init."""
import asyncio
import json
import pytest

from recoil.pipeline.core.persistence import (
    save_scene, load_scene, scene_path, list_scenes, init_scenes_from_plan,
    SCHEMA_VERSION,
    scene_manifest_path, scene_version_path, manifest_artifact_path,
    load_manifest, load_scene_active, load_scene_active_with_version,
    active_version, active_scene_body_path, save_active_scene_status,
    SceneStructureImmutableError,
)
from recoil.pipeline._lib import derivation_manifest
from recoil.pipeline._lib.schema_versions import SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION
from recoil.pipeline.core.scene_version_store import SceneVersionStore
from recoil.pipeline.core.take import Scene, Beat
from recoil.pipeline.core.workflow import Workflow, WorkflowStep


def _make_wf(wid="wf"):
    return Workflow(
        workflow_id=wid,
        steps=[WorkflowStep(step_id="kf", modality="image_t2i",
                            payload={"prompt": "x"})],
    )


def test_round_trip_3beats_5takes(tmp_path):
    scene = Scene(scene_id="SEQ11")
    for i in range(3):
        b = Beat(beat_id=f"B{i}")
        for _ in range(5):
            b.new_take(workflow=_make_wf(f"wf_{i}_{_}"))
        scene.add_beat(b)
    p = tmp_path / "out.json"
    save_scene(scene, p)
    loaded = load_scene(p)
    assert loaded.scene_id == "SEQ11"
    assert len(loaded.beats) == 3
    for b in loaded.beats:
        assert len(b.takes) == 5


def test_schema_version_field(tmp_path):
    scene = Scene(scene_id="SEQ1")
    p = tmp_path / "s.json"
    save_scene(scene, p)
    raw = json.loads(p.read_text())
    assert raw["schema_version"] == SCHEMA_VERSION


def test_load_rejects_wrong_schema(tmp_path):
    p = tmp_path / "s.json"
    p.write_text(json.dumps({"schema_version": 999, "scene_id": "X", "beats": []}))
    with pytest.raises(KeyError, match="schema_version mismatch"):
        load_scene(p)


def test_load_missing_file(tmp_path):
    with pytest.raises(FileNotFoundError):
        load_scene(tmp_path / "ghost.json")


def test_atomic_no_partial_on_pre_existing(tmp_path):
    """A pre-existing tmp file is overwritten cleanly."""
    p = tmp_path / "s.json"
    tmp = p.with_suffix(p.suffix + ".tmp")
    tmp.write_text("garbage")
    save_scene(Scene(scene_id="OK"), p)
    assert p.exists()
    assert not tmp.exists()  # os.replace consumed it


def test_idempotent_init_from_plan(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    plan = {
        "sequences": {
            "SEQ11": {"shots": [{"shot_id": "EP001_SH01"},
                                {"shot_id": "EP001_SH02"}]},
            "SEQ12": {"shots": [{"shot_id": "EP001_SH03"}]},
        }
    }
    first = init_scenes_from_plan("pX", "ep_001", plan)
    assert len(first) == 2
    assert sum(len(s.beats) for s in first) == 3
    # Re-run is idempotent: existing files not overwritten with empty data.
    second = init_scenes_from_plan("pX", "ep_001", plan)
    assert len(second) == 2


def test_list_scenes_empty(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    assert list_scenes("p", "ep_001") == []


def test_scene_path_pure(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    p = scene_path("driver-beware", "ep_001", "SEQ11")
    assert p.parts[-3:] == ("orchestration", "scenes", "ep_001_SEQ11.json")
    assert not p.exists()  # pure path computation


def test_scene_path_normalizes_episode_token(tmp_path, monkeypatch):
    """The 2026-06-03 `1_BATCH_*` pollution: a bare int/number episode must NEVER
    fork the namespace. int 1, str "1", and the canonical "ep_001" all collapse to
    the single canonical filename; a malformed token raises instead of writing junk."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    canonical = "ep_001_BATCH_001.json"
    assert scene_path("p", 1, "BATCH_001").name == canonical          # int (the bug)
    assert scene_path("p", "1", "BATCH_001").name == canonical        # bare numeric str
    assert scene_path("p", "ep_001", "BATCH_001").name == canonical   # already canonical
    assert scene_path("p", 12, "BATCH_001").name == "ep_012_BATCH_001.json"
    with pytest.raises(ValueError):
        scene_path("p", "garbage", "BATCH_001")
    with pytest.raises(ValueError):
        scene_path("p", -1, "BATCH_001")     # negative int → no 'ep_-01' divergence
    with pytest.raises(ValueError):
        scene_path("p", True, "BATCH_001")   # bool is not an episode (isinstance(True, int) trap)


# ── Scene versioning (REC-231 Phase 1): manifest schema + pointer-faithful loader ──


def _version_entry(version, artifact, *, state, downstream):
    """A minimal manifest version entry carrying only the reader-required keys."""
    return {"version": version, "artifact": artifact,
            "state": state, "downstream": downstream}


def _write_manifest(project, episode, batch_id, manifest):
    path = scene_manifest_path(project, episode, batch_id)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(manifest))
    return path


def test_manifest_v1_default_byte_identical(tmp_path, monkeypatch):
    """The load-bearing no-flag-day gate: a flat scene with NO manifest loads as v1
    active, byte-identical, and load_scene_active creates no manifest on read."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    scene = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="B0")])
    flat = scene_path("p", "ep_001", "BATCH_001")
    save_scene(scene, flat)

    active = load_scene_active("p", "ep_001", "BATCH_001")
    assert active.to_dict() == load_scene(flat).to_dict()
    # the atomic primitive reports implicit v1 active
    s, ver = load_scene_active_with_version("p", "ep_001", "BATCH_001")
    assert ver == 1
    assert s.to_dict() == load_scene(flat).to_dict()
    # NO .versions.json was created on read
    assert not scene_manifest_path("p", "ep_001", "BATCH_001").exists()


def test_load_scene_active_resolves_pointer(tmp_path, monkeypatch):
    """A 2-version manifest: load_scene_active returns the ACTIVE version's body,
    distinguished by beat content; flipping active_version flips the loaded body."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    # both bodies share scene_id (same batch — loader asserts identity) but differ in beats
    v1 = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="B0")])
    v2 = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="B0"), Beat(beat_id="B1")])
    save_scene(v1, scene_version_path("p", "ep_001", "BATCH_001", 1))
    save_scene(v2, scene_version_path("p", "ep_001", "BATCH_001", 2))
    manifest = {
        "schema_version": SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION,
        "batch_id": "BATCH_001",
        "active_version": 1,
        "versions": [
            _version_entry(1, "ep_001_BATCH_001.v001.json",
                           state="approved", downstream="derived"),
            _version_entry(2, "ep_001_BATCH_001.v002.json",
                           state="candidate", downstream="not_derived"),
        ],
    }
    _write_manifest("p", "ep_001", "BATCH_001", manifest)

    # active_version=1 → v001 body (one beat)
    assert [b.beat_id for b in load_scene_active("p", "ep_001", "BATCH_001").beats] == ["B0"]
    s1, ver1 = load_scene_active_with_version("p", "ep_001", "BATCH_001")
    assert ver1 == 1 and [b.beat_id for b in s1.beats] == ["B0"]

    # flip the pointer to v2 → v002 body (two beats)
    manifest["active_version"] = 2
    _write_manifest("p", "ep_001", "BATCH_001", manifest)
    assert [b.beat_id for b in load_scene_active("p", "ep_001", "BATCH_001").beats] == ["B0", "B1"]
    s2, ver2 = load_scene_active_with_version("p", "ep_001", "BATCH_001")
    assert ver2 == 2 and [b.beat_id for b in s2.beats] == ["B0", "B1"]


def test_version_path_padding(tmp_path, monkeypatch):
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    p = scene_version_path("p", 1, "BATCH_001", 2)
    assert p.name.endswith("ep_001_BATCH_001.v002.json")


def test_manifest_schema_mismatch_raises(tmp_path, monkeypatch):
    """A schema_version mismatch raises KeyError; a cross-batch (copied) manifest
    raises ValueError on read via both load_manifest and load_scene_active."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    # schema_version mismatch → KeyError
    _write_manifest("p", "ep_001", "BATCH_001", {
        "schema_version": 99, "batch_id": "BATCH_001",
        "active_version": 1, "versions": [],
    })
    with pytest.raises(KeyError):
        load_manifest("p", "ep_001", "BATCH_001")

    # identity mismatch: a manifest whose batch_id != the requested batch → ValueError
    _write_manifest("p", "ep_001", "BATCH_001", {
        "schema_version": SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION,
        "batch_id": "BATCH_999", "active_version": 1,
        "versions": [_version_entry(1, "ep_001_BATCH_001.json",
                                    state="approved", downstream="derived")],
    })
    with pytest.raises(ValueError):
        load_manifest("p", "ep_001", "BATCH_001")
    with pytest.raises(ValueError):
        load_scene_active("p", "ep_001", "BATCH_001")


def test_manifest_artifact_path_resolves_flat_v1(tmp_path, monkeypatch):
    """A v1 entry recording the FLAT basename resolves to the flat file; a v2 entry
    recording `.v002.json` resolves to the versioned body — and load_scene_active
    returns whichever the pointer references."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    flat_body = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="FLAT")])
    save_scene(flat_body, scene_path("p", "ep_001", "BATCH_001"))
    v2_body = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="V2")])
    save_scene(v2_body, scene_version_path("p", "ep_001", "BATCH_001", 2))
    manifest = {
        "schema_version": SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION,
        "batch_id": "BATCH_001",
        "active_version": 1,
        "versions": [
            _version_entry(1, "ep_001_BATCH_001.json",
                           state="approved", downstream="derived"),
            _version_entry(2, "ep_001_BATCH_001.v002.json",
                           state="candidate", downstream="not_derived"),
        ],
    }

    # v1 artifact (the flat name) resolves to the on-disk flat file
    p1 = manifest_artifact_path("p", "ep_001", "BATCH_001", manifest, 1)
    assert p1 == scene_path("p", "ep_001", "BATCH_001")
    assert p1.name == "ep_001_BATCH_001.json"
    # v2 artifact resolves to the versioned body
    p2 = manifest_artifact_path("p", "ep_001", "BATCH_001", manifest, 2)
    assert p2.name == "ep_001_BATCH_001.v002.json"

    # load_scene_active honors the pointer: active=1 → flat body; active=2 → versioned body
    _write_manifest("p", "ep_001", "BATCH_001", manifest)
    assert [b.beat_id for b in load_scene_active("p", "ep_001", "BATCH_001").beats] == ["FLAT"]
    manifest["active_version"] = 2
    _write_manifest("p", "ep_001", "BATCH_001", manifest)
    assert [b.beat_id for b in load_scene_active("p", "ep_001", "BATCH_001").beats] == ["V2"]


def test_manifest_artifact_path_rejects_traversal(tmp_path, monkeypatch):
    """An artifact containing a path separator or `..` raises ValueError (path-safety)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    for bad in ("../evil.json", "sub/evil.json", "/abs/evil.json", "a/../evil.json"):
        manifest = {
            "schema_version": SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION,
            "batch_id": "BATCH_001", "active_version": 1,
            "versions": [_version_entry(1, bad, state="approved", downstream="derived")],
        }
        with pytest.raises(ValueError):
            manifest_artifact_path("p", "ep_001", "BATCH_001", manifest, 1)


def test_list_scenes_excludes_manifest_and_versions(tmp_path, monkeypatch):
    """In a dir holding the flat scene, its manifest, and a version body, list_scenes
    returns ONLY the flat active-scene identity."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    body = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="B0")])
    save_scene(body, scene_path("p", 1, "BATCH_001"))               # flat
    save_scene(body, scene_version_path("p", 1, "BATCH_001", 2))    # version body
    scene_manifest_path("p", 1, "BATCH_001").write_text(json.dumps({  # manifest
        "schema_version": SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION,
        "batch_id": "BATCH_001", "active_version": 1, "versions": [],
    }))

    listed = list_scenes("p", 1)
    assert [p.name for p in listed] == ["ep_001_BATCH_001.json"]


# ── Scene versioning (REC-231 Phase 2): append-only candidate writer ──────────


def test_write_scene_candidate_legacy_materialize(tmp_path, monkeypatch):
    """First re-derive of a flat batch materializes the manifest (flat → v1
    legacy/approved/active) and appends the new structure as v2 candidate."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    flat = scene_path("p", "ep_001", "BATCH_001")
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]), flat)

    n = SceneVersionStore("p", "ep_001").write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat(beat_id="A"), Beat(beat_id="B")])
    )
    assert n == 2

    manifest = load_manifest("p", "ep_001", "BATCH_001")
    assert manifest is not None
    assert manifest["active_version"] == 1
    v1 = manifest["versions"][0]
    assert v1["version"] == 1
    assert v1["source"] == "legacy_flat"
    assert v1["state"] == "approved"
    assert v1["artifact"] == "ep_001_BATCH_001.json"
    v2 = manifest["versions"][1]
    assert v2["version"] == 2
    assert v2["state"] == "candidate"
    assert v2["artifact"] == "ep_001_BATCH_001.v002.json"
    assert scene_version_path("p", "ep_001", "BATCH_001", 2).exists()


def test_write_scene_candidate_pointer_unmoved(tmp_path, monkeypatch):
    """After the append the pointer is still v1 and load_scene_active returns the v1
    (opening) body byte-identical to the original flat."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    flat = scene_path("p", "ep_001", "BATCH_001")
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]), flat)
    flat_dict = load_scene(flat).to_dict()

    SceneVersionStore("p", "ep_001").write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat(beat_id="NEW")])
    )

    assert load_manifest("p", "ep_001", "BATCH_001")["active_version"] == 1
    assert load_scene_active("p", "ep_001", "BATCH_001").to_dict() == flat_dict


def test_write_scene_candidate_body_write_once(tmp_path, monkeypatch):
    """A second append (v3) never rewrites an existing version body (v2 is immutable)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]),
               scene_path("p", "ep_001", "BATCH_001"))
    store = SceneVersionStore("p", "ep_001")

    n2 = store.write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat(beat_id="V2")]))
    assert n2 == 2
    v2_bytes = scene_version_path("p", "ep_001", "BATCH_001", 2).read_bytes()

    n3 = store.write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat(beat_id="V3")]))
    assert n3 == 3
    assert scene_version_path("p", "ep_001", "BATCH_001", 2).read_bytes() == v2_bytes
    assert scene_version_path("p", "ep_001", "BATCH_001", 3).exists()


def test_orphan_body_recovery(tmp_path, monkeypatch):
    """A crashed prior append (a .v003.json body the manifest does NOT list) does not
    wedge the batch: the next append computes N=4 (max over manifest=2 AND filesystem=3,
    +1), and leaves the orphan untouched."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]),
               scene_path("p", "ep_001", "BATCH_001"))
    store = SceneVersionStore("p", "ep_001")
    store.write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat(beat_id="V2")]))  # manifest max → 2

    orphan = scene_version_path("p", "ep_001", "BATCH_001", 3)
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="ORPHAN")]), orphan)
    orphan_bytes = orphan.read_bytes()
    assert max(v["version"] for v in load_manifest("p", "ep_001", "BATCH_001")["versions"]) == 2

    n = store.write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat(beat_id="V4")]))
    assert n == 4
    assert scene_version_path("p", "ep_001", "BATCH_001", 4).exists()
    assert orphan.read_bytes() == orphan_bytes  # orphan v003 untouched (deferred-prune)


def test_first_creation_stays_flat_implicit_v1(tmp_path, monkeypatch):
    """Case A: a brand-new batch (no manifest, no flat) persists the FLAT file with no
    manifest (implicit v1 active); the first re-derive (Case B) then materializes the
    manifest with v1=flat/legacy + v2=candidate."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = SceneVersionStore("p", "ep_001")
    brand_new = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")])
    assert store.write_scene_candidate("BATCH_001", brand_new) == 1
    assert scene_path("p", "ep_001", "BATCH_001").exists()
    assert not scene_manifest_path("p", "ep_001", "BATCH_001").exists()
    assert load_scene_active("p", "ep_001", "BATCH_001").to_dict() == brand_new.to_dict()

    assert store.write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat("A"), Beat("B")])) == 2
    manifest = load_manifest("p", "ep_001", "BATCH_001")
    assert manifest["active_version"] == 1
    assert manifest["versions"][0]["source"] == "legacy_flat"
    assert manifest["versions"][1]["state"] == "candidate"


def test_init_scenes_first_write_flat_discoverable(tmp_path, monkeypatch):
    """init_scenes_from_plan persists a NEW batch as the FLAT file (no manifest), and
    batch_selector.resolve + list_scenes find it (no born-versioned stranding)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    from recoil.pipeline.orchestrator import batch_selector

    plan = {"sequences": {"BATCH_001": {"shots": [{"shot_id": "EP001_SH01"}]}}}
    scenes = init_scenes_from_plan("fixture", "ep_001", plan)
    assert [s.scene_id for s in scenes] == ["BATCH_001"]
    assert scene_path("fixture", "ep_001", "BATCH_001").exists()
    assert not scene_manifest_path("fixture", "ep_001", "BATCH_001").exists()

    assert [p.name for p in list_scenes("fixture", "ep_001")] == ["ep_001_BATCH_001.json"]
    ref = batch_selector.resolve("EP001_CONT_001", "fixture")
    assert ref.scene_id == "BATCH_001"
    assert ref.scene_path == scene_path("fixture", "ep_001", "BATCH_001")


def test_status_save_does_not_append_version(tmp_path, monkeypatch):
    """A take/run STATUS save (the completion/dispatch-state save_scene path) on a
    versioned batch must NOT append a version — proving status saves were NOT misrouted
    into write_scene_candidate; only the active body's status mutated."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="B0")]),
               scene_path("p", "ep_001", "BATCH_001"))
    SceneVersionStore("p", "ep_001").write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat("B0"), Beat("B1")]))
    before = load_manifest("p", "ep_001", "BATCH_001")
    n_versions_before = len(before["versions"])
    assert before["active_version"] == 1

    # REC-231 Phase 4: the canonical in-place status persister is
    # save_active_scene_status (the runner's _save_scene_for_run was retired). A take
    # STATUS change persists onto the active body and never touches the manifest.
    save_active_scene_status(
        "p", "ep_001", "BATCH_001", expected_version=1,
        mutate=lambda s: s.beats[0].new_take(workflow=_make_wf("status")),
    )

    after = load_manifest("p", "ep_001", "BATCH_001")
    assert len(after["versions"]) == n_versions_before  # NO new version appended
    assert after["active_version"] == 1                  # pointer unchanged
    assert len(load_scene_active("p", "ep_001", "BATCH_001").beats[0].takes) == 1


def test_flat_structural_delta_fails_closed(tmp_path, monkeypatch):
    """Writer 2 is strict for flat bodies too: status mutates cannot change structure."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    flat = scene_path("p", "ep_001", "BATCH_001")
    save_scene(
        Scene(scene_id="BATCH_001", beats=[Beat("B0", beat_metadata={"shot": {"id": "s0"}})]),
        flat,
    )
    before = flat.read_bytes()

    def _structural(scene):
        scene.beats[0].beat_metadata["prompt_directive"] = "new prompt"

    with pytest.raises(SceneStructureImmutableError):
        save_active_scene_status("p", "ep_001", "BATCH_001", expected_version=1, mutate=_structural)
    assert flat.read_bytes() == before


def test_missing_registered_body_fails_closed(tmp_path, monkeypatch):
    """A manifest entry without its registered body is corruption, not a cold start."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    manifest = {
        "schema_version": SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION,
        "batch_id": "BATCH_001",
        "active_version": 2,
        "versions": [
            _version_entry(2, "ep_001_BATCH_001.v002.json",
                           state="approved", downstream="derived"),
        ],
    }
    _write_manifest("p", "ep_001", "BATCH_001", manifest)

    with pytest.raises(FileNotFoundError):
        save_active_scene_status(
            "p", "ep_001", "BATCH_001", expected_version=2,
            mutate=lambda s: s.beats.append(Beat("B0")),
        )


def test_init_restamps_locked_scene(tmp_path, monkeypatch):
    """scene.locked is inert UX metadata; init still restamps non-structural max_takes."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    locked = Scene(
        scene_id="BATCH_001",
        beats=[Beat("B0", max_takes=2)],
        locked=True,
        lock_reason="operator review",
    )
    save_scene(locked, scene_path("p", "ep_001", "BATCH_001"))

    plan = {"sequences": {"BATCH_001": {"shots": [{"shot_id": "B0"}]}}}
    scenes = init_scenes_from_plan("p", "ep_001", plan, max_takes=5)

    assert scenes[0].locked is True
    assert scenes[0].beats[0].max_takes == 5
    assert load_scene_active("p", "ep_001", "BATCH_001").beats[0].max_takes == 5


# ── REC-231 Phase 2: behavioral tests driving the LIVE run_episode_batches site ──

from pathlib import Path  # noqa: E402

from recoil.pipeline._lib.plan_loader import CanonicalPlan, CanonicalShot  # noqa: E402
from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner  # noqa: E402


def _rederive_shot(batch_index: int, shot_index: int) -> CanonicalShot:
    ordinal = (batch_index - 1) * 3 + shot_index
    return CanonicalShot(
        shot_id=f"EP001_SH{ordinal:02d}",
        scene_index=batch_index,
        sequence_id=None,
        pipeline="video",
        previs_model=None,
        video_model="seeddance-2.0",
        location_id=f"LOC_{batch_index}",
        characters=[],
        shot_type="MS",
        duration_s=2.0,
        is_env_only=False,
        has_dialogue=False,
        aspect_ratio="9:16",
        quality=None,
        cinematography=None,
        raw={},
    )


def _rederive_plan() -> CanonicalPlan:
    shots = [_rederive_shot(b, s) for b in range(1, 5) for s in range(1, 4)]
    return CanonicalPlan(
        episode_id="ep_001",
        project="fixture",
        shots=shots,
        source_path=Path("fixture_plan.json"),
        raw={"episode_id": "ep_001", "project": "fixture",
             "shots": [{"shot_id": shot.shot_id} for shot in shots]},
    )


def _rederive_runner(plan: CanonicalPlan) -> EpisodeRunner:
    return EpisodeRunner(project="fixture", plan=plan.raw, casting={},
                         episode="ep_001", concurrency=1)


def _configure_rederive_root(tmp_path, monkeypatch) -> None:
    (tmp_path / ".recoil-data-root").write_text("recoil-data-root\n", encoding="utf-8")
    (tmp_path / "fixture").mkdir()
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))


def _trap_run_scene():
    async def _run_scene(scene, **kwargs):  # noqa: ANN001, ANN003
        raise AssertionError("run_scene must not be called in derive_only mode")
    return _run_scene


def test_rederive_append_via_runner(tmp_path, monkeypatch):
    """LIVE path: the run_episode_batches derive_only write appends a candidate (v2),
    leaves the flat/v1 body byte-untouched, and never moves the pointer."""
    _configure_rederive_root(tmp_path, monkeypatch)
    flat = scene_path("fixture", "ep_001", "BATCH_001")
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]), flat)
    flat_before = flat.read_bytes()

    plan = _rederive_plan()
    runner = _rederive_runner(plan)
    monkeypatch.setattr(runner, "run_scene", _trap_run_scene())

    asyncio.run(runner.run_episode_batches(
        plan, derive_only=True, only_scene_ids={"BATCH_001"}))

    assert scene_version_path("fixture", "ep_001", "BATCH_001", 2).exists()
    assert flat.read_bytes() == flat_before  # flat/v1 body byte-untouched
    manifest = load_manifest("fixture", "ep_001", "BATCH_001")
    assert manifest["active_version"] == 1  # pointer never moved
    assert manifest["versions"][0]["source"] == "legacy_flat"
    assert manifest["versions"][1]["state"] == "candidate"


def test_rederive_dry_run_no_write_creates_no_version(tmp_path, monkeypatch):
    """LIVE path (REC-100): a derive_only dry-run persists NOTHING — no manifest, no
    version body — and leaves a pre-existing flat scene byte-untouched.

    (Named to match BOTH the phase gate's `dry_run_no_write` -k token and acceptance
    test #9's `..._creates_no_version` intent — the spec's two references differ.)"""
    _configure_rederive_root(tmp_path, monkeypatch)
    flat = scene_path("fixture", "ep_001", "BATCH_001")
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]), flat)
    flat_before = flat.read_bytes()

    plan = _rederive_plan()
    runner = _rederive_runner(plan)
    monkeypatch.setattr(runner, "run_scene", _trap_run_scene())

    asyncio.run(runner.run_episode_batches(
        plan, derive_only=True, dry_run=True, only_scene_ids={"BATCH_001"}))

    assert not scene_manifest_path("fixture", "ep_001", "BATCH_001").exists()
    assert not scene_version_path("fixture", "ep_001", "BATCH_001", 2).exists()
    assert flat.read_bytes() == flat_before


def test_derive_only_full_run_does_not_stamp_scenes_stage(tmp_path, monkeypatch):
    """A full derive_only pass stages candidate versions only; it must not stamp the
    episode-level scenes stage fresh for inactive bodies."""
    _configure_rederive_root(tmp_path, monkeypatch)
    for index in range(1, 5):
        batch_id = f"BATCH_{index:03d}"
        save_scene(
            Scene(scene_id=batch_id, beats=[Beat(beat_id=f"OLD_{batch_id}")]),
            scene_path("fixture", "ep_001", batch_id),
        )
    derivation_manifest.stamp_stage(
        "fixture",
        1,
        "scenes",
        kind="derived",
        content_sha="prior-scenes-sha",
        structural_sha=None,
        source={"plan_structural_sha": "prior-plan-sha"},
        builder="test.prior",
        built_at="2026-01-01T00:00:00+00:00",
    )
    stage_before = derivation_manifest.load("fixture", 1)["stages"]["scenes"]

    plan = _rederive_plan()
    runner = _rederive_runner(plan)
    monkeypatch.setattr(runner, "run_scene", _trap_run_scene())

    result = asyncio.run(runner.run_episode_batches(plan, derive_only=True))

    assert result["written"] == [
        "BATCH_001",
        "BATCH_002",
        "BATCH_003",
        "BATCH_004",
    ]
    assert derivation_manifest.load("fixture", 1)["stages"]["scenes"] == stage_before


def test_normal_full_run_still_stamps_scenes_stage(tmp_path, monkeypatch):
    """The derive_only guard must not suppress normal active-scene writes."""
    _configure_rederive_root(tmp_path, monkeypatch)
    plan = _rederive_plan()
    runner = _rederive_runner(plan)

    async def _no_dispatch(scene, **kwargs):  # noqa: ANN001, ANN003
        return scene

    monkeypatch.setattr(runner, "run_scene", _no_dispatch)
    monkeypatch.setattr(
        "recoil.pipeline.orchestrator.episode_runner._preflight_board_gate",
        lambda **kwargs: None,
    )

    scenes = asyncio.run(runner.run_episode_batches(plan, dry_run=False))

    assert [scene.scene_id for scene in scenes] == [
        "BATCH_001",
        "BATCH_002",
        "BATCH_003",
        "BATCH_004",
    ]
    stage = derivation_manifest.load("fixture", 1)["stages"]["scenes"]
    assert stage["kind"] == "derived"
    assert stage["builder"] == "episode_runner.scenes"


# ─────────────────────────────────────────────────────────────────────────────
# REC-231 Phase 5: provenance stamp + freshness gate (block → re-board → unblock)
# ─────────────────────────────────────────────────────────────────────────────


def _p5_downstream(manifest, version):
    return next(v for v in manifest["versions"] if v["version"] == version)["downstream"]


def _p5_versioned_batch(*, conform_to=None):
    """Flat v1 (legacy/derived) + an appended v2 candidate (born not_derived); the
    pointer stays at v1 unless ``conform_to`` is given. Project 'p', episode 'ep_001'."""
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]),
               scene_path("p", "ep_001", "BATCH_001"))
    store = SceneVersionStore("p", "ep_001")
    store.write_scene_candidate(
        "BATCH_001", Scene(scene_id="BATCH_001", beats=[Beat("A"), Beat("B")]))
    if conform_to is not None:
        store.conform("BATCH_001", conform_to)
    return store


def test_provenance_stamp_carries_version(tmp_path, monkeypatch):
    """The REAL per-beat stamp site (_build_workflow_for_beat) stamps {batch_id,
    scene_version, beat_id} into global_provenance, with scene_version == the ACTIVE
    version (v2 after a conform) — not the newest, not v1. Exercises the live stamp
    site, not a hand-built dict; existing provenance keys are preserved (additive)."""
    import dataclasses
    import recoil.pipeline._lib.dispatch_payload as dispatch_payload
    from recoil.pipeline._lib.plan_loader import CanonicalShot
    from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner

    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = _p5_versioned_batch(conform_to=2)  # active = v2
    assert active_version(load_manifest("p", "ep_001", "BATCH_001")) == 2

    shot = CanonicalShot(
        shot_id="EP001_SH01", scene_index=1, sequence_id=None, pipeline="video",
        previs_model=None, video_model="seeddance-2.0", location_id="LOC_A",
        characters=[], shot_type="MS", duration_s=2.0, is_env_only=False,
        has_dialogue=False, aspect_ratio="9:16", raw={},
    )
    beat = Beat(
        beat_id="EP001_SH01__cov",
        beat_metadata={
            "scene_id": "BATCH_001",
            "modality": "r2v_multi",
            "shot": dataclasses.asdict(shot),
            "batch_shots": [dataclasses.asdict(shot)],
            "grouping": {"strategy": "continuity", "ordinal": 1,
                         "shot_ids": ["EP001_SH01"], "source_pass_id": None},
        },
    )

    def _fake_payload(**kwargs):
        return {"shot_id": kwargs["shot"].shot_id, "prompt": "p",
                "reference_images": [], "grouping": dict(kwargs["grouping"])}

    monkeypatch.setattr(dispatch_payload, "build_dispatch_payload", _fake_payload)

    runner = EpisodeRunner(project="p", plan={}, episode="ep_001")
    wf = runner._build_workflow_for_beat(beat, take_index=0, beat_index=0)

    assert wf.global_provenance["batch_id"] == "BATCH_001"
    assert wf.global_provenance["scene_version"] == 2          # the ACTIVE version
    assert wf.global_provenance["beat_id"] == "EP001_SH01__cov"
    # additive — existing provenance keys preserved
    assert wf.global_provenance["scene_id"] == "BATCH_001"
    assert wf.global_provenance["shot_id"] == "EP001_SH01__cov"


def test_provenance_stamp_flat_batch_is_v1(tmp_path, monkeypatch):
    """For a flat (un-versioned) batch the stamp records scene_version == 1."""
    import dataclasses
    import recoil.pipeline._lib.dispatch_payload as dispatch_payload
    from recoil.pipeline._lib.plan_loader import CanonicalShot
    from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner

    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat("OPENING")]),
               scene_path("p", "ep_001", "BATCH_001"))
    assert load_manifest("p", "ep_001", "BATCH_001") is None  # flat, no manifest

    shot = CanonicalShot(
        shot_id="EP001_SH01", scene_index=1, sequence_id=None, pipeline="video",
        previs_model=None, video_model="seeddance-2.0", location_id="LOC_A",
        characters=[], shot_type="MS", duration_s=2.0, is_env_only=False,
        has_dialogue=False, aspect_ratio="9:16", raw={},
    )
    beat = Beat(beat_id="EP001_SH01", beat_metadata={
        "scene_id": "BATCH_001", "modality": "video_i2v",
        "shot": dataclasses.asdict(shot)})
    monkeypatch.setattr(dispatch_payload, "build_dispatch_payload",
                        lambda **k: {"shot_id": k["shot"].shot_id, "reference_images": []})

    runner = EpisodeRunner(project="p", plan={}, episode="ep_001")
    wf = runner._build_workflow_for_beat(beat, take_index=0, beat_index=0)
    assert wf.global_provenance["batch_id"] == "BATCH_001"
    assert wf.global_provenance["scene_version"] == 1


def test_freshness_cycle_block_reboard_unblock(tmp_path, monkeypatch):
    """Full lifecycle: v2 appended born not_derived → conform(v2) PRESERVES not_derived
    (conform never touches downstream) → _preflight_board_gate (beats carry
    beat_metadata['scene_id']==batch_id) resolves beat→batch and BLOCKS dispatch → a
    successful re-board calls mark_derived(batch, 2) → the gate UNBLOCKS. Proves
    not_derived is BOTH consumed by the gate AND clearable (no permanent block)."""
    from recoil.pipeline.orchestrator.episode_runner import (
        BoardGateError,
        _preflight_board_gate,
    )

    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = _p5_versioned_batch(conform_to=2)  # active = v2, born not_derived
    manifest = load_manifest("p", "ep_001", "BATCH_001")
    assert manifest["active_version"] == 2
    assert _p5_downstream(manifest, 2) == "not_derived"  # conform left it not_derived

    beats = [Beat(beat_id="EP001_SH01",
                  beat_metadata={"scene_id": "BATCH_001", "modality": "r2v_multi"})]

    # the gate CONSUMES not_derived → BLOCKS dispatch
    with pytest.raises(BoardGateError) as exc:
        _preflight_board_gate(project="p", episode="ep_001", beats=beats)
    assert exc.value.reason == "active_version_not_derived"

    # a successful re-board clears not_derived (the sole clearer)
    store.mark_derived("BATCH_001", 2)
    assert _p5_downstream(load_manifest("p", "ep_001", "BATCH_001"), 2) == "derived"

    # the gate now UNBLOCKS (no raise)
    _preflight_board_gate(project="p", episode="ep_001", beats=beats)


def test_preflight_gate_fails_loud_on_unmapped_beat(tmp_path, monkeypatch):
    """A beat missing beat_metadata['scene_id'] makes _preflight_board_gate RAISE — it
    does NOT silently skip the freshness check (an unmapped beat must not bypass the
    not_derived block). Proves the beat→batch mapping is fail-loud."""
    from recoil.pipeline.orchestrator.episode_runner import _preflight_board_gate

    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    beats = [Beat(beat_id="EP001_SH01", beat_metadata={"modality": "r2v_multi"})]  # no scene_id
    with pytest.raises(ValueError, match="scene_id"):
        _preflight_board_gate(project="p", episode="ep_001", beats=beats)


def test_freshness_blocks_even_when_board_gate_disabled(tmp_path, monkeypatch):
    """With board_gate_enabled(project, episode) False AND an active not_derived version,
    _preflight_board_gate STILL blocks — the scene-version freshness check runs BEFORE
    the board_gate_enabled early-return. The disabled board-APPROVAL gate cannot bypass
    scene-version freshness."""
    from recoil.pipeline.orchestrator import episode_runner as er
    from recoil.pipeline.orchestrator.episode_runner import (
        BoardGateError,
        _preflight_board_gate,
        board_gate_enabled,
    )

    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    monkeypatch.delenv("RECOIL_BOARD_GATE", raising=False)
    monkeypatch.setattr(er, "load_project_config", lambda _project: {})  # gate OFF
    _p5_versioned_batch(conform_to=2)  # active v2, not_derived
    assert board_gate_enabled("p", 1) is False  # the board-APPROVAL gate IS disabled

    beats = [Beat(beat_id="EP001_SH01",
                  beat_metadata={"scene_id": "BATCH_001", "modality": "r2v_multi"})]
    with pytest.raises(BoardGateError) as exc:
        _preflight_board_gate(project="p", episode="ep_001", beats=beats)
    assert exc.value.reason == "active_version_not_derived"


def test_board_output_carries_scene_version(tmp_path, monkeypatch):
    """Re-board the active version (v2) via the board_builder path: the persisted board
    sidecar provenance carries scene_version == 2 (the version it derived against,
    parallel to the runner's global_provenance stamp), AND the successful re-board calls
    mark_derived so v2's downstream flips to 'derived' (the gate would now unblock). The
    in-place board status targets the v2 body; v1 stays byte-untouched."""
    from recoil.pipeline._lib import board_builder as bb
    from recoil.pipeline.core.receipts import GenerationReceipt
    from recoil.pipeline.core.registry import MODALITY_STORYBOARD, RunResult

    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))

    def _shot():
        return {"shot_id": "EP001_SH01", "scene_index": 1, "duration_s": 1.0,
                "intent": "Beat action.",
                "asset_data": {"characters": [], "location_id": None},
                "spatial_data": {}}

    def _r2v_scene(beat_id):
        return Scene(
            scene_id="BATCH_001",
            beats=[Beat(beat_id=beat_id, beat_metadata={
                "scene_id": "BATCH_001", "modality": "r2v_multi",
                "shot": _shot(), "batch_shots": [_shot()],
                "batch_summary": {"shared_characters": [], "shared_location_id": None}})],
            scene_metadata={"episode": "ep_001", "project": "p"},
        )

    save_scene(_r2v_scene("V1"), scene_path("p", "ep_001", "BATCH_001"))
    store = SceneVersionStore("p", "ep_001")
    store.write_scene_candidate("BATCH_001", _r2v_scene("V2"))
    store.conform("BATCH_001", 2)  # active = v2, born not_derived
    v1_bytes = scene_path("p", "ep_001", "BATCH_001").read_bytes()
    assert _p5_downstream(load_manifest("p", "ep_001", "BATCH_001"), 2) == "not_derived"

    captured: dict = {}

    def _fake_dispatch(modality, payload, *, context):
        captured["payload"] = payload
        return GenerationReceipt(
            receipt_id="r", modality=MODALITY_STORYBOARD, caller_id="board_builder",
            project="p", episode=1, shot_id="BATCH_001",
            timestamp_utc="2026-06-22T00:00:00Z",
            run_result=RunResult(id="run", modality=MODALITY_STORYBOARD,
                                 output_path="/tmp/board.png", metadata={},
                                 success=True, error=None),
        )

    monkeypatch.setattr(bb, "dispatch", _fake_dispatch)
    monkeypatch.setattr(
        bb, "derive_settings",
        lambda segments, **k: [dict(s, setting=f"S{i}") for i, s in enumerate(segments, 1)])
    monkeypatch.setattr(bb.core_paths, "get_pipeline_config",
                        lambda: {"storyboard_iteration": {"quality": "high", "size": "full"}})

    result = bb.build_and_dispatch_board("p", 1, "EP001_CONT_001", step_runner=object())

    assert result["success"] is True
    # the board sidecar carries the ACTIVE scene version it was derived against
    assert captured["payload"]["sidecar_extra"]["scene_version"] == 2
    # the successful re-board cleared not_derived (mark_derived) — gate would unblock
    assert _p5_downstream(load_manifest("p", "ep_001", "BATCH_001"), 2) == "derived"
    # the in-place board status went to the active v2 body; v1 is byte-untouched
    assert scene_path("p", "ep_001", "BATCH_001").read_bytes() == v1_bytes
