"""REC-231 Phase 9 — terminal end-to-end versioning scenario (NO PAID SPEND).

The mandatory terminal gate's scenario half. ``audit_dispatch.py`` only audits payload
shapes + the dispatch registry — it never exercises append/conform/revert/routing/
freshness — so it cannot prove this build works. This file does: it drives the WHOLE
non-destructive-versioning lifecycle on the LIVE code path and asserts every link of the
pointer chain.

Spend discipline (NO Flora dispatch is reached anywhere):
  - The candidate APPEND runs through the live ``run_episode_batches`` re-derivation path
    with ``dry_run=False`` — a ``derive_only`` run never reaches ``run_scene``/``dispatch``
    (REC-100/REC-231), so it appends + halts before any spend. Dry-run is NOT used for the
    mutation because the runner's dry-run mode is read-only w.r.t. scene state (it would
    persist nothing, so append/conform/mark_derived could not be exercised).
  - ``conform``/``revert``/``mark_derived`` are called directly on the store.
  - The NON-derive generation passes (steps f/g/h) stub ``run_scene`` with a SYNTHETIC
    success (no real dispatch); the surrounding live flow — pointer resolution, the
    pre-dispatch save, and the post-run completion save — runs for real around the stub.
  - ``dry_run=True`` is reserved for the read-only dispatch-RESOLUTION checks (assert the
    dry-run dispatch resolves the ACTIVE body's version).
"""
from __future__ import annotations

import asyncio
from pathlib import Path

import pytest

from recoil.pipeline._lib.plan_loader import CanonicalPlan, CanonicalShot
from recoil.pipeline.core.persistence import (
    SceneStructureImmutableError,
    active_version,
    init_scenes_from_plan,
    load_manifest,
    load_scene,
    load_scene_active,
    save_active_scene_status,
    scene_path,
    scene_version_path,
    structure_fingerprint,
)
from recoil.pipeline.core.scene_version_store import SceneVersionStore
from recoil.pipeline.core.take import Beat
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.pipeline.orchestrator import batch_selector
from recoil.pipeline.orchestrator.episode_runner import (
    BoardGateError,
    EpisodeRunner,
    _preflight_board_gate,
)
from recoil.workspace import readmodel

PROJECT = "fixture"
EP = "ep_001"
BATCH = "BATCH_001"
SELECTOR = "EP001_CONT_001"
ARTIFACT_V1 = "prep/ep_001/storyboards/BATCH_001_v1.png"
ARTIFACT_V2 = "prep/ep_001/storyboards/BATCH_001_v2.png"


# ── fixture / builders ─────────────────────────────────────────────────────────
def _configure_root(tmp_path, monkeypatch) -> None:
    """A temp projects root with the fixture project. RECOIL_BOARD_GATE=0 disables the
    board-APPROVAL gate (scene-version freshness still runs unconditionally)."""
    (tmp_path / ".recoil-data-root").write_text("recoil-data-root\n", encoding="utf-8")
    project_root = tmp_path / PROJECT
    project_root.mkdir()
    (project_root / "project_config.json").write_text("{}")
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    monkeypatch.setenv("RECOIL_BOARD_GATE", "0")


def _shot(shot_index: int) -> CanonicalShot:
    """A continuity shot in scene 1 / LOC_1. Three of these cluster into ONE batch
    (>= DEFAULT_MIN_BATCH_SIZE) → a single r2v_multi ``BATCH_001`` group."""
    return CanonicalShot(
        shot_id=f"EP001_SH{shot_index:02d}", scene_index=1, sequence_id=None,
        pipeline="video", previs_model=None, video_model="seeddance-2.0",
        location_id="LOC_1", characters=[], shot_type="MS", duration_s=2.0,
        is_env_only=False, has_dialogue=False, aspect_ratio="9:16", quality=None,
        cinematography=None, raw={},
    )


def _plan(*, total_shots: int | None = None) -> CanonicalPlan:
    """A single-batch plan: 3 continuity shots → one r2v_multi ``BATCH_001``. The
    ``sequences`` entry seeds a DIFFERENT (2-beat) init flat so the first re-derive is a
    genuine structure change (v1 init-shape → v2 group-shape)."""
    shots = [_shot(i) for i in range(1, 4)]
    raw = {
        "episode_id": EP,
        "project": PROJECT,
        "shots": [{"shot_id": s.shot_id} for s in shots],
        "sequences": {BATCH: {"shots": [{"shot_id": "INIT_A"}, {"shot_id": "INIT_B"}]}},
    }
    if total_shots is not None:
        raw["total_shots"] = total_shots
    return CanonicalPlan(
        episode_id=EP, project=PROJECT, shots=shots,
        source_path=Path("plan.json"), raw=raw,
    )


def _runner(plan: CanonicalPlan, *, max_takes: int = 3) -> EpisodeRunner:
    return EpisodeRunner(
        project=PROJECT, plan=plan.raw, casting={}, episode=EP,
        concurrency=1, max_takes=max_takes,
    )


def _wf(wid: str = "wf") -> Workflow:
    return Workflow(
        workflow_id=wid,
        steps=[WorkflowStep(step_id="kf", modality="image_t2i", payload={"prompt": "x"})],
    )


def _trap_run_scene():
    """A run_scene that must never be reached — proves a derive_only pass appends without
    ever touching the dispatch path (no spend)."""
    async def _run(scene, **kwargs):  # noqa: ANN001, ANN003
        raise AssertionError("run_scene must not be reached on a derive_only pass")
    return _run


def _capturing_run_scene(record: dict, *, mutate=None):
    """An async run_scene STUB: captures the resolved (scene, expected_version) and
    optionally applies ``mutate`` to synthesize a dispatch outcome — NO Flora spend, no
    real dispatch. Stands in for run_scene at the ``run_episode_batches`` call site so the
    surrounding live flow (pointer resolution, pre-dispatch save, post-run completion
    save) runs for real around the stub."""
    async def _run(scene, **kwargs):  # noqa: ANN001, ANN003
        record["scene"] = scene
        record["expected_version"] = kwargs.get("expected_version")
        record["dry_run"] = kwargs.get("dry_run")
        if mutate is not None:
            mutate(scene)
        return scene
    return _run


def _reroll_dry(monkeypatch) -> dict:
    """Drive the CLI batch-reroll dispatch entry in dry-run (read-only, REC-100): it reads
    the ACTIVE version body via the pointer and returns an estimate without dispatching.
    ``_estimate_take_cost`` is stubbed so the estimate needs no model profiles."""
    from recoil.pipeline.cli import generate as gen

    monkeypatch.setattr(EpisodeRunner, "_estimate_take_cost", lambda self, beat: 2.5)
    return gen._run_batch_reroll(
        project=PROJECT, episode=1, batch=SELECTOR, strategy=None, seed=None,
        make_primary=False, budget_usd=25.0, dry_run=True,
    )


def _build_v2_active(monkeypatch, plan: CanonicalPlan) -> SceneVersionStore:
    """Live setup → a versioned batch with active = v2 (derived):
      init the Case-A flat v1 → append v2 via the LIVE derive_only re-derivation path
      (no dispatch) → conform to v2 → mark it derived. Returns the store."""
    init_scenes_from_plan(PROJECT, EP, plan.raw)
    runner = _runner(plan)
    monkeypatch.setattr(runner, "run_scene", _trap_run_scene())
    asyncio.run(
        runner.run_episode_batches(plan, derive_only=True, only_scene_ids={BATCH})
    )
    store = SceneVersionStore(PROJECT, EP)
    store.conform(BATCH, 2)
    store.mark_derived(BATCH, 2)
    return store


# ── the single end-to-end lifecycle test (steps a0, a–e) ─────────────────────────
def test_versioning_e2e_lifecycle(tmp_path, monkeypatch):
    """The whole pointer chain end-to-end on the LIVE path: Case-A discovery → live
    append → conform → freshness block → re-board/mark_derived → dispatch-resolution →
    lossless revert. Every reader observes the manifest pointer, never the newest body."""
    _configure_root(tmp_path, monkeypatch)
    plan = _plan()

    # (a0) Case-A discovery — init_scenes_from_plan creates a brand-new batch as a FLAT
    # file (no manifest); a manifest-only scene that flat-keyed discovery cannot find
    # CANNOT occur. Give v1 a pointed board so get_board has an artifact to resolve.
    init_scenes_from_plan(PROJECT, EP, plan.raw)
    assert scene_path(PROJECT, EP, BATCH).exists()
    assert load_manifest(PROJECT, EP, BATCH) is None  # genuinely Case-A (no manifest)
    save_active_scene_status(
        PROJECT, EP, BATCH, expected_version=1,
        mutate=lambda s: s.beats[0].set_board_proposed(ARTIFACT_V1, "a" * 64, 1),
    )
    assert load_manifest(PROJECT, EP, BATCH) is None  # a status save created NO manifest

    assert batch_selector.resolve(SELECTOR, PROJECT).scene_id == BATCH
    assert readmodel.get_board(SELECTOR, PROJECT).board_artifact == ARTIFACT_V1
    sv = readmodel.get_scene_versions(SELECTOR, PROJECT)
    assert sv.active_version == 1 and sv.newer_unpointed_versions == 0
    # a dispatch DRY-RUN resolves the Case-A flat as implicit v1 (read-only).
    rec = {}
    a0_runner = _runner(plan)
    monkeypatch.setattr(a0_runner, "run_scene", _capturing_run_scene(rec))
    asyncio.run(a0_runner.run_episode_batches(plan, dry_run=True, only_scene_ids={BATCH}))
    assert rec["expected_version"] == 1

    v1_flat_bytes = scene_path(PROJECT, EP, BATCH).read_bytes()

    # (a) append a candidate via the LIVE re-derivation write path (dry_run=False; a
    # derive_only run reaches no dispatch) → the pointer never moves (active stays v1).
    derive_runner = _runner(plan)
    monkeypatch.setattr(derive_runner, "run_scene", _trap_run_scene())
    asyncio.run(
        derive_runner.run_episode_batches(plan, derive_only=True, only_scene_ids={BATCH})
    )
    store = SceneVersionStore(PROJECT, EP)
    manifest = load_manifest(PROJECT, EP, BATCH)
    assert manifest["active_version"] == 1
    assert [v["version"] for v in manifest["versions"]] == [1, 2]
    assert manifest["versions"][0]["source"] == "legacy_flat"
    assert manifest["versions"][1]["state"] == "candidate"
    assert scene_path(PROJECT, EP, BATCH).read_bytes() == v1_flat_bytes  # v1 untouched
    v2_path = scene_version_path(PROJECT, EP, BATCH, 2)

    # (b) live readers still see v1 (the pointer never moved on append).
    assert readmodel.get_board(SELECTOR, PROJECT).board_artifact == ARTIFACT_V1
    assert readmodel.get_scene_versions(SELECTOR, PROJECT).active_version == 1
    assert _reroll_dry(monkeypatch)["error"] == "new_take_requires_single_r2v_multi_beat"

    # (c) conform(v2) — v2 is a real structure change vs v1 (1 r2v beat vs 2 init beats).
    store.conform(BATCH, 2)
    assert readmodel.get_scene_versions(SELECTOR, PROJECT).active_version == 2
    assert readmodel.get_board(SELECTOR, PROJECT).board_artifact is None  # v2 unboarded
    after = _reroll_dry(monkeypatch)  # reroll now reads v2 (single r2v) → estimate
    assert after["success"] is True and after["dry_run"] is True
    # the freshness gate BLOCKS dispatch: v2 is a conformed-to candidate, still not_derived.
    gate_beats = [
        Beat(beat_id=BATCH, beat_metadata={"scene_id": BATCH, "modality": "r2v_multi"})
    ]
    with pytest.raises(BoardGateError) as exc:
        _preflight_board_gate(project=PROJECT, episode=EP, beats=gate_beats)
    assert exc.value.reason == "active_version_not_derived"

    # (d) re-board the active v2 (sets its pointed board) → mark_derived clears not_derived.
    save_active_scene_status(
        PROJECT, EP, BATCH, expected_version=2,
        mutate=lambda s: s.beats[0].set_board_proposed(ARTIFACT_V2, "b" * 64, 1),
    )
    store.mark_derived(BATCH, 2)
    _preflight_board_gate(project=PROJECT, episode=EP, beats=gate_beats)  # UNBLOCKS
    assert readmodel.get_board(SELECTOR, PROJECT).board_artifact == ARTIFACT_V2
    # a dispatch DRY-RUN now resolves against v2's body (the active version it loaded).
    rec_d = {}
    d_runner = _runner(plan)
    monkeypatch.setattr(d_runner, "run_scene", _capturing_run_scene(rec_d))
    asyncio.run(d_runner.run_episode_batches(plan, dry_run=True, only_scene_ids={BATCH}))
    assert rec_d["expected_version"] == 2
    assert [b.beat_id for b in rec_d["scene"].beats] == [BATCH]  # the single r2v v2 beat
    v2_bytes = v2_path.read_bytes()

    # (e) revert(v1) → readers see v1 again, losslessly (only the pointer moved).
    store.revert(BATCH, 1)
    assert readmodel.get_scene_versions(SELECTOR, PROJECT).active_version == 1
    assert readmodel.get_board(SELECTOR, PROJECT).board_artifact == ARTIFACT_V1
    assert _reroll_dry(monkeypatch)["error"] == "new_take_requires_single_r2v_multi_beat"
    assert [b.beat_id for b in load_scene_active(PROJECT, EP, BATCH).beats] == [
        "INIT_A", "INIT_B"
    ]
    # lossless: neither version body was rewritten by conform/reboard/revert.
    assert scene_path(PROJECT, EP, BATCH).read_bytes() == v1_flat_bytes
    assert v2_path.read_bytes() == v2_bytes


# ── (f) NON-DERIVE generation-loop save (round-12 MAJOR) ─────────────────────────
def test_versioning_e2e_non_derive_completion_save(tmp_path, monkeypatch):
    """A non-derive generation pass's post-run completion save on a VERSIONED batch
    (active = v2) is STATUS-ONLY: (i) a status-only save mutates ONLY the active body's
    status — no new version, pointer unchanged, no .vNNN structure rewritten; (ii) an
    injected STRUCTURAL delta at that save is a hard invariant error
    (SceneStructureImmutableError), never a silent append. No spend — run_scene is stubbed
    to a synthetic success."""
    _configure_root(tmp_path, monkeypatch)
    plan = _plan(total_shots=999)  # large total → never triggers the scenes-complete stamp
    _build_v2_active(monkeypatch, plan)
    assert active_version(load_manifest(PROJECT, EP, BATCH)) == 2
    v1_bytes = scene_path(PROJECT, EP, BATCH).read_bytes()
    v2_path = scene_version_path(PROJECT, EP, BATCH, 2)
    n_versions = len(load_manifest(PROJECT, EP, BATCH)["versions"])

    # (i) status-only completion save: a synthetic take is a NON-structural status change.
    def _status_success(scene):
        scene.beats[0].new_take(workflow=_wf("synthetic-success"))

    runner = _runner(plan)
    monkeypatch.setattr(
        runner, "run_scene", _capturing_run_scene({}, mutate=_status_success)
    )
    asyncio.run(runner.run_episode_batches(plan, only_scene_ids={BATCH}))

    manifest = load_manifest(PROJECT, EP, BATCH)
    assert active_version(manifest) == 2                 # pointer unchanged
    assert len(manifest["versions"]) == n_versions       # NO new version appended
    assert scene_path(PROJECT, EP, BATCH).read_bytes() == v1_bytes  # v1 untouched
    assert load_scene(v2_path).beats[0].takes            # v2 status mutated in place
    v2_after_status = v2_path.read_bytes()

    # (ii) an injected STRUCTURAL delta at the post-run completion save raises a hard
    # invariant error — the post-dispatch save is status-only and must NOT silently append.
    def _structural(scene):
        scene.beats[0].beat_metadata["shot"] = {"shot_id": "MUTATED_STRUCTURE"}

    runner2 = _runner(plan)
    monkeypatch.setattr(
        runner2, "run_scene", _capturing_run_scene({}, mutate=_structural)
    )
    with pytest.raises(SceneStructureImmutableError):
        asyncio.run(runner2.run_episode_batches(plan, only_scene_ids={BATCH}))
    # write NOTHING on the structural delta: v2 byte-unchanged, still no new version.
    assert v2_path.read_bytes() == v2_after_status
    assert len(load_manifest(PROJECT, EP, BATCH)["versions"]) == n_versions
    assert active_version(load_manifest(PROJECT, EP, BATCH)) == 2


# ── (g) PRE-DISPATCH metadata-refresh save (round-14 + round-17 MAJOR) ────────────
def test_versioning_e2e_pre_dispatch_metadata_refresh(tmp_path, monkeypatch):
    """The pre-dispatch save on normal generation is content-branched. A status/metadata
    refresh that RECOMPUTES ``inputs_fingerprint`` (blank → fresh) leaves
    ``structure_fingerprint`` UNCHANGED, so it persists as STATUS on v2 (no new version,
    pointer unchanged) — a recomputed render-input digest must NEVER falsely append. The
    derive_only path at the same persist site appends a candidate (pointer unmoved) and
    does NOT mutate the v2 body."""
    _configure_root(tmp_path, monkeypatch)
    plan = _plan(total_shots=999)
    _build_v2_active(monkeypatch, plan)
    v2_path = scene_version_path(PROJECT, EP, BATCH, 2)
    v1_bytes = scene_path(PROJECT, EP, BATCH).read_bytes()
    fp_before = structure_fingerprint(load_scene(v2_path))
    n_versions = len(load_manifest(PROJECT, EP, BATCH)["versions"])
    # the live v2 body's beat carries a BLANK inputs_fingerprint (the stale case).
    assert "inputs_fingerprint" not in load_scene(v2_path).beats[0].beat_metadata

    # normal-generation pass: the dispatch refresh recomputes inputs_fingerprint to fresh.
    def _recompute_inputs_fp(scene):
        scene.beats[0].beat_metadata["inputs_fingerprint"] = "fresh-digest-recomputed"

    runner = _runner(plan)
    monkeypatch.setattr(
        runner, "run_scene", _capturing_run_scene({}, mutate=_recompute_inputs_fp)
    )
    asyncio.run(runner.run_episode_batches(plan, only_scene_ids={BATCH}))

    body = load_scene(v2_path)
    assert body.beats[0].beat_metadata["inputs_fingerprint"] == "fresh-digest-recomputed"
    assert structure_fingerprint(body) == fp_before      # NOT treated as a structural delta
    manifest = load_manifest(PROJECT, EP, BATCH)
    assert active_version(manifest) == 2                 # pointer unchanged
    assert len(manifest["versions"]) == n_versions       # NO false append on refresh
    assert scene_path(PROJECT, EP, BATCH).read_bytes() == v1_bytes  # v1 untouched

    # the from-script derive_only opt-in path at the SAME site dedups an unchanged
    # rebuild (pointer unmoved) and does NOT mutate the v2 body.
    v2_bytes = v2_path.read_bytes()
    derive_runner = _runner(plan)
    monkeypatch.setattr(derive_runner, "run_scene", _trap_run_scene())
    asyncio.run(
        derive_runner.run_episode_batches(
            plan,
            derive_only=True,
            dedup_candidate=True,
            only_scene_ids={BATCH},
        )
    )
    manifest = load_manifest(PROJECT, EP, BATCH)
    assert [v["version"] for v in manifest["versions"]] == [1, 2]  # unchanged rebuild deduped
    assert active_version(manifest) == 2                 # pointer unmoved by the append
    assert v2_path.read_bytes() == v2_bytes              # v2 body untouched by dedup


def test_scene_from_group_is_structure_deterministic(tmp_path, monkeypatch):
    _configure_root(tmp_path, monkeypatch)
    plan = _plan()
    runner = _runner(plan)
    groups = runner._scene_from_group(
        runner._group_from_batch(
            type("Batch", (), {
                "batch_id": BATCH,
                "shots": plan.shots,
                "shared_location_id": "LOC_1",
                "shared_characters": [],
                "total_duration_s": 6.0,
                "below_threshold": False,
            })(),
            fallback_ordinal=1,
        )
    )
    again = runner._scene_from_group(
        runner._group_from_batch(
            type("Batch", (), {
                "batch_id": BATCH,
                "shots": plan.shots,
                "shared_location_id": "LOC_1",
                "shared_characters": [],
                "total_duration_s": 6.0,
                "below_threshold": False,
            })(),
            fallback_ordinal=1,
        )
    )

    assert [b.beat_metadata for b in groups.beats] == [b.beat_metadata for b in again.beats]
    assert groups.scene_metadata == again.scene_metadata
    assert structure_fingerprint(groups) == structure_fingerprint(again)


def test_write_scene_candidate_dedups_against_active_recorded_fingerprint(
    tmp_path,
    monkeypatch,
):
    _configure_root(tmp_path, monkeypatch)
    plan = _plan()
    store = _build_v2_active(monkeypatch, plan)
    active_scene = load_scene_active(PROJECT, EP, BATCH)
    manifest_before = load_manifest(PROJECT, EP, BATCH)
    assert manifest_before["active_version"] == 2
    assert len(manifest_before["versions"]) == 2

    returned = store.write_scene_candidate(
        BATCH,
        active_scene,
        dedup_against_versions=True,
    )

    manifest_after = load_manifest(PROJECT, EP, BATCH)
    assert returned == 2
    assert manifest_after["active_version"] == 2
    assert len(manifest_after["versions"]) == 2


def test_write_scene_candidate_appends_real_render_change(tmp_path, monkeypatch):
    _configure_root(tmp_path, monkeypatch)
    plan = _plan()
    store = _build_v2_active(monkeypatch, plan)
    changed = load_scene_active(PROJECT, EP, BATCH)
    changed.beats[0].beat_metadata["shot"]["duration_s"] = 9.0

    returned = store.write_scene_candidate(BATCH, changed)

    manifest = load_manifest(PROJECT, EP, BATCH)
    assert returned == 3
    assert manifest["active_version"] == 2
    assert [v["version"] for v in manifest["versions"]] == [1, 2, 3]


# ── (h) RESUMED init_scenes_from_plan after conform (round-15 MAJOR) ──────────────
def test_versioning_e2e_resumed_init_after_conform(tmp_path, monkeypatch):
    """``run_episode_batches``'s startup ``init_scenes_from_plan`` re-stamps the V2 ACTIVE
    body's ``max_takes`` via the structure-guarded status writer while leaving the flat/v1
    body byte-untouched (no clobber, no new version, pointer stays 2). Exercises the real
    EpisodeRunner startup after a conform, not just the persistence unit."""
    _configure_root(tmp_path, monkeypatch)
    plan = _plan(total_shots=999)  # large total → never triggers the scenes-complete stamp
    _build_v2_active(monkeypatch, plan)
    v2_path = scene_version_path(PROJECT, EP, BATCH, 2)
    v1_bytes = scene_path(PROJECT, EP, BATCH).read_bytes()
    # the active v2 body's beat(s) currently carry the default max_takes (3).
    assert all(b.max_takes == 3 for b in load_scene(v2_path).beats)

    # drive the real runner startup with a NEW max_takes (init persists when not dry_run
    # AND only_scene_ids is None); run_scene is stubbed (no spend, no structural mutation).
    runner = _runner(plan, max_takes=5)
    monkeypatch.setattr(runner, "run_scene", _capturing_run_scene({}))
    asyncio.run(runner.run_episode_batches(plan))

    active = load_scene_active(PROJECT, EP, BATCH)
    assert all(b.max_takes == 5 for b in active.beats)              # v2 re-stamped
    assert scene_path(PROJECT, EP, BATCH).read_bytes() == v1_bytes  # flat/v1 untouched
    manifest = load_manifest(PROJECT, EP, BATCH)
    assert active_version(manifest) == 2                            # pointer stays 2
    assert len(manifest["versions"]) == 2                           # no new version


def test_sanitized_candidate_scene_is_structure_only():
    """REC-231 finding #6: the derive_only / coverage candidate is STRUCTURE-ONLY —
    no inherited takes/board/primary/approval/lock — while structural beat_metadata
    (shot/grouping/configs) survives; the source scene is not mutated."""
    from recoil.pipeline.orchestrator.episode_runner import _sanitized_candidate_scene
    from recoil.pipeline.core.take import Scene, Beat, Take
    from recoil.pipeline.core.workflow import Workflow, WorkflowStep

    wf = Workflow(workflow_id="w",
                  steps=[WorkflowStep(step_id="v", modality="video_t2v", payload={})])
    beat = Beat(beat_id="B0",
                beat_metadata={"shot": {"x": 1}, "grouping": {"g": 1},
                               "generation_config": {"tier": "pro"}})
    beat.takes = [Take("B0_take_0", 0, wf, status="succeeded")]
    beat.primary_take_id = "B0_take_0"
    beat.board = {"status": "approved"}
    beat.approved = True
    scene = Scene(scene_id="BATCH_001", beats=[beat], scene_metadata={"e": 1})
    scene.locked = True

    cand = _sanitized_candidate_scene(scene)
    cb = cand.beats[0]
    assert cb.takes == [] and cb.primary_take_id is None
    assert cb.board is None and cb.approved is False
    assert cand.locked is False
    assert cb.beat_metadata["shot"] == {"x": 1}
    assert cb.beat_metadata["generation_config"] == {"tier": "pro"}
    # source untouched
    assert [t.take_id for t in scene.beats[0].takes] == ["B0_take_0"]
