"""REC-231 Phase 4 — route every live scene reader/writer through the active pointer.

Behavioral routing tests: each proves a live path OBSERVES the active-version pointer
(reads resolve the active body; in-place STATUS writes target the active body via the
structure-guarded ``save_active_scene_status``), plus the static anti-bypass gates that
forbid any unallowlisted raw scene/manifest access.

────────────────────────────────────────────────────────────────────────────────────
INVENTORY (grounding grep, classified) — every live scene reader/writer routed here:

  grep -rIn "scene_path(|load_scene(|save_scene(|list_scenes(|orchestration_scenes_dir" \
    recoil --include='*.py' | grep -vE "/tests/|/test_|core/persistence.py"

  site                                            class  routing
  ──────────────────────────────────────────────  ─────  ────────────────────────────
  workspace/readmodel.py:get_board                 (a)    load_scene_active
  workspace/board.py:build_episode_board           (a)    list_scenes + load_scene_active
  workspace/board.py:_resolve_board_sidecar        (a)    beat.board.artifact (pointed)
  workspace/server.py:_episode_board_mtime_key      (a)    list_scenes + manifest mtime
  orchestrator/from_script_target.py:79/111         (a)    active_scene_body_path/active
  pipeline/api/routes/reroll.py:118/120             (a)    load_scene_active_with_version
  pipeline/cli/generate.py:_run_batch_reroll 564    (a)    load_scene_active(+_with_version)
  pipeline/_lib/board_builder.py:273/649/1163       (a)    load_scene_active[_with_version]
  api/engine_routes.py:_list_scenes                 (a)    synthetic identity adapter (safe)
  pipeline/cli/generate.py:_run_scene_lock          (b)    save_active_scene_status
  pipeline/cli/generate.py:_run_board_decision      (b)    save_active_scene_status
  pipeline/cli/generate.py:_run_revalidate_board    (b)    save_active_scene_status
  pipeline/_lib/board_builder.py:610/1200           (b)    save_active_scene_status
  orchestrator/episode_runner.py status saves       (b)    _persist_active_status → "
  persistence.py:init_scenes_from_plan re-stamp     (b)    save_active_scene_status
  orchestrator/episode_runner.py derive_only persist (c)   write_scene_candidate (Phase 2)
  tools/honor_rate_probe.py:235/310                 tool   NON-production — NOT routed
  orchestrator/batch_selector.py:resolve glob       owner  selector→scene_id identity (flat)

  (a) read that must observe the pointer  (b) in-place status write on the active body
  (c) re-derivation appender (Phase 2)
────────────────────────────────────────────────────────────────────────────────────
"""
from __future__ import annotations

import ast
import json
import re
import threading
from pathlib import Path

import pytest

from recoil.pipeline._lib.derivation_sha import shotset_hash
from recoil.pipeline.core.persistence import (
    SceneStructureImmutableError,
    SceneVersionConflictError,
    active_scene_body_path,
    init_scenes_from_plan,
    load_manifest,
    load_scene,
    load_scene_active,
    load_scene_active_with_version,
    save_active_scene_status,
    save_scene,
    scene_path,
    scene_version_path,
)
from recoil.pipeline.core.scene_version_store import SceneVersionStore
from recoil.pipeline.core.take import Beat, Scene
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.workspace import readmodel

EP = "ep_001"
BATCH = "BATCH_001"
SELECTOR = "EP001_CONT_001"
_GROUPING = {
    "strategy": "continuity",
    "ordinal": 1,
    "shot_ids": ["EP001_SH01"],
    "shotset_hash": shotset_hash(["EP001_SH01"]),
}


# ── fixtures / builders ──────────────────────────────────────────────────────────
def _project(tmp_path, monkeypatch):
    root = tmp_path / "projects"
    root.mkdir()
    (root / ".recoil-data-root").write_text("recoil-data-root\n")
    proj_root = root / "fixture"
    proj_root.mkdir()
    (proj_root / "project_config.json").write_text("{}")
    scenes = proj_root / "_pipeline" / "state" / "orchestration" / "scenes"
    storyboards = proj_root / "prep" / EP / "storyboards"
    scenes.mkdir(parents=True)
    storyboards.mkdir(parents=True)
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(root))
    return "fixture", proj_root, scenes, storyboards


def _wf(wid="wf"):
    return Workflow(
        workflow_id=wid,
        steps=[WorkflowStep(step_id="kf", modality="image_t2i", payload={"prompt": "x"})],
    )


def _board(artifact, status="proposed"):
    return {
        "status": status,
        "artifact": artifact,
        "source_sha256": "d" * 64,
        "approved_by": None,
        "updated_at": "2026-06-22T00:00:00Z",
        "fingerprint_version": 1,
    }


def _shot(seg, dur=1.5):
    return {"segment_id": seg, "setting": "kitchen", "duration_s": dur}


def _r2v_beat(beat_id, *, board=None, batch_shots=None, grouping=None):
    md = {"modality": "r2v_multi", "grouping": dict(grouping or _GROUPING)}
    if batch_shots is not None:
        md["batch_shots"] = batch_shots
    return Beat(beat_id=beat_id, board=board, beat_metadata=md)


def _scene(beats, scene_id=BATCH):
    return Scene(scene_id=scene_id, beats=beats,
                 scene_metadata={"grouping": dict(_GROUPING)})


def _make_versioned(project, v1_scene, v2_scene, *, batch_id=BATCH, conform_to=None):
    """Flat v1 + an appended v2 candidate (pointer at v1); optionally conform to N."""
    save_scene(v1_scene, scene_path(project, EP, batch_id))
    store = SceneVersionStore(project, EP)
    store.write_scene_candidate(batch_id, v2_scene)
    if conform_to is not None:
        store.conform(batch_id, conform_to)
    return store


# ── 1. get_board observes the active pointer ─────────────────────────────────────
def test_get_board_observes_active_pointer(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    v1 = _scene([_r2v_beat("V1", board=_board("prep/ep_001/storyboards/BATCH_001_v1.png"))])
    v2 = _scene([_r2v_beat("V2", board=_board("prep/ep_001/storyboards/BATCH_001_v2.png"))])
    store = _make_versioned(project, v1, v2)

    assert readmodel.get_board(SELECTOR, project).board_artifact == (
        "prep/ep_001/storyboards/BATCH_001_v1.png"
    )
    store.conform(BATCH, 2)
    assert readmodel.get_board(SELECTOR, project).board_artifact == (
        "prep/ep_001/storyboards/BATCH_001_v2.png"
    )


# ── 2. workspace board wall + mtime-key observe the pointer ──────────────────────
def _wall_scene(artifact):
    return _scene([_r2v_beat("B", board=_board(artifact), batch_shots=[_shot("EP001_SH01")])])


def test_workspace_board_wall_observes_pointer(tmp_path, monkeypatch):
    project, _root, _scenes, storyboards = _project(tmp_path, monkeypatch)
    for v, status in ((1, "proposed"), (2, "approved")):
        (storyboards / f"BATCH_001_v{v}.png").write_bytes(b"x" * 2048)
        (storyboards / f"BATCH_001_v{v}.png.json").write_text(json.dumps({"status": status}))
    store = _make_versioned(
        project,
        _wall_scene("prep/ep_001/storyboards/BATCH_001_v1.png"),
        _wall_scene("prep/ep_001/storyboards/BATCH_001_v2.png"),
    )

    from recoil.workspace import board as ws_board
    from recoil.workspace import server as ws_server

    ids = ws_board.normalize_episode("EP001")
    key_before = ws_server._episode_board_mtime_key(project, ids)
    batch = ws_board.build_episode_board(project, "EP001")["batches"][0]
    assert batch["board_artifact"] == "prep/ep_001/storyboards/BATCH_001_v1.png"
    assert batch["version"] == 1

    store.conform(BATCH, 2)
    key_after = ws_server._episode_board_mtime_key(project, ids)
    assert key_after != key_before  # manifest-only conform busts the cache

    batch = ws_board.build_episode_board(project, "EP001")["batches"][0]
    assert batch["board_artifact"] == "prep/ep_001/storyboards/BATCH_001_v2.png"
    assert batch["version"] == 2


def test_episode_board_mtime_key_observes_active_version_body_status(tmp_path, monkeypatch):
    project, _root, _scenes, storyboards = _project(tmp_path, monkeypatch)
    (storyboards / "BATCH_001_v2.png").write_bytes(b"x" * 2048)
    (storyboards / "BATCH_001_v2.png.json").write_text(
        json.dumps({"status": "proposed"}),
        encoding="utf-8",
    )
    _make_versioned(
        project,
        _wall_scene("prep/ep_001/storyboards/BATCH_001_v1.png"),
        _wall_scene("prep/ep_001/storyboards/BATCH_001_v2.png"),
        conform_to=2,
    )

    from recoil.workspace import board as ws_board
    from recoil.workspace import server as ws_server

    ids = ws_board.normalize_episode("EP001")
    key_before = ws_server._episode_board_mtime_key(project, ids)

    save_active_scene_status(
        project,
        EP,
        BATCH,
        expected_version=2,
        mutate=lambda s: s.beats[0].set_board_proposed(
            artifact="prep/ep_001/storyboards/BATCH_001_v2.png",
            source_sha256="e" * 64,
            fingerprint_version=1,
        ),
    )

    assert ws_server._episode_board_mtime_key(project, ids) != key_before


# ── 2b. board wall dereferences the pointed board, not the newest sidecar ────────
def test_board_wall_ignores_newer_unpointed_sidecar(tmp_path, monkeypatch):
    project, _root, _scenes, storyboards = _project(tmp_path, monkeypatch)
    for v in (1, 9):  # v9 is a NEWER, UNPOINTED sidecar on disk
        (storyboards / f"BATCH_001_v{v}.png").write_bytes(b"x" * 2048)
        (storyboards / f"BATCH_001_v{v}.png.json").write_text(json.dumps({"status": "proposed"}))
    # flat v1 active; its beat points at board v1 (NOT the newest v9)
    save_scene(_wall_scene("prep/ep_001/storyboards/BATCH_001_v1.png"),
               scene_path(project, EP, BATCH))

    from recoil.workspace import board as ws_board

    batch = ws_board.build_episode_board(project, "EP001")["batches"][0]
    assert batch["board_artifact"] == "prep/ep_001/storyboards/BATCH_001_v1.png"
    assert batch["version"] == 1


# ── 3. reroll API reads the active body ──────────────────────────────────────────
def test_reroll_api_reads_active_body(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    # v1: TWO beats (fails the single-beat reroll contract); v2: one valid r2v beat.
    v1 = _scene([_r2v_beat("A"), _r2v_beat("B")])
    v2 = _scene([_r2v_beat("V2", batch_shots=[_shot("EP001_SH01")])])
    store = _make_versioned(project, v1, v2)

    from recoil.pipeline.api.routes import reroll as reroll_mod

    class _FakeRunner:
        def _estimate_take_cost(self, beat):
            return 2.5

    monkeypatch.setattr(reroll_mod, "_runner_for_reroll", lambda p, e: _FakeRunner())
    body = {"project": project, "episode": 1, "batch_id": SELECTOR, "dry_run": True}

    # active = v1 (two beats) → the route reads it and rejects the contract
    resp = reroll_mod.reroll(body)
    assert json.loads(resp.body)["error"] == "batch_not_single_beat" or resp.status_code == 422

    store.conform(BATCH, 2)
    resp = reroll_mod.reroll(body)  # active = v2 (single r2v) → dry-run estimate
    assert resp.status_code == 200
    assert "budget_estimate_usd" in json.loads(resp.body)


# ── 3b. CLI batch-reroll reads the active body (no-spend dry-run) ─────────────────
def test_cli_batch_reroll_reads_active_body(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    v1 = _scene([_r2v_beat("A"), _r2v_beat("B")])          # fails single-beat contract
    v2 = _scene([_r2v_beat("V2", batch_shots=[_shot("EP001_SH01")])])
    store = _make_versioned(project, v1, v2)

    from recoil.pipeline.cli import generate as gen
    from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner

    monkeypatch.setattr(EpisodeRunner, "_estimate_take_cost", lambda self, beat: 2.5)

    def _run():
        return gen._run_batch_reroll(
            project=project, episode=1, batch=SELECTOR, strategy=None, seed=None,
            make_primary=False, budget_usd=25.0, dry_run=True,
        )

    assert _run()["error"] == "new_take_requires_single_r2v_multi_beat"  # read v1
    store.conform(BATCH, 2)
    after = _run()  # read v2 (single r2v) → dry-run estimate, no dispatch
    assert after["success"] is True
    assert after["dry_run"] is True


# ── 4. CLI board decision targets the active body ────────────────────────────────
def test_cli_board_decision_targets_active_body(tmp_path, monkeypatch):
    project, _root, _scenes, storyboards = _project(tmp_path, monkeypatch)
    (storyboards / "BATCH_001_v2.png").write_bytes(b"x" * 2048)
    board = _board("prep/ep_001/storyboards/BATCH_001_v2.png")
    v1 = _scene([_r2v_beat("V1", board=_board("prep/ep_001/storyboards/BATCH_001_v1.png"),
                           batch_shots=[_shot("EP001_SH01")])])
    v2 = _scene([_r2v_beat("V2", board=board, batch_shots=[_shot("EP001_SH01")])])
    _make_versioned(project, v1, v2, conform_to=2)
    v1_bytes = scene_path(project, EP, BATCH).read_bytes()

    from recoil.pipeline.cli import generate as gen

    # Stub the orthogonal board-sha + SSOT machinery; this test is about pointer routing.
    monkeypatch.setattr(gen, "_compute_board_source_sha256",
                        lambda project, batch, beat: beat.board["source_sha256"])
    monkeypatch.setattr(gen, "_stamp_board_ssot", lambda *a, **k: None)

    code, result = gen._run_board_decision(
        project=project, episode=1, batch=SELECTOR, decision="reject", reason="x", route=None,
    )
    # the v2 (active) body now carries the rejected board; v1 is byte-untouched
    active = load_scene_active(project, EP, BATCH)
    assert active.beats[0].board["status"] == "rejected"
    assert scene_path(project, EP, BATCH).read_bytes() == v1_bytes
    assert load_manifest(project, EP, BATCH)["active_version"] == 2


def test_board_decision_save_failure_leaves_no_orphan_stamp(tmp_path, monkeypatch):
    project, _root, _scenes, storyboards = _project(tmp_path, monkeypatch)
    artifact = "prep/ep_001/storyboards/BATCH_001_v1.png"
    (storyboards / "BATCH_001_v1.png").write_bytes(b"x" * 2048)
    save_scene(
        _wall_scene(artifact),
        scene_path(project, EP, BATCH),
    )

    from recoil.pipeline.cli import generate as gen

    stamps: list[tuple] = []
    monkeypatch.setattr(gen, "_compute_board_source_sha256", lambda *a, **k: "d" * 64)
    monkeypatch.setattr(gen, "_stamp_board_ssot", lambda *a, **k: stamps.append(a))

    def _raise_save(*args, **kwargs):
        raise RuntimeError("save failed")

    monkeypatch.setattr(gen, "save_active_scene_status", _raise_save)

    with pytest.raises(RuntimeError, match="save failed"):
        gen._run_board_decision(
            project=project,
            episode=1,
            batch=SELECTOR,
            decision="approve",
            reason="ok",
            route=None,
        )

    assert stamps == []


def test_board_decision_post_save_stamp_failure_rolls_back_body_and_ssot(
    tmp_path,
    monkeypatch,
):
    project, _root, _scenes, storyboards = _project(tmp_path, monkeypatch)
    artifact = "prep/ep_001/storyboards/BATCH_001_v1.png"
    (storyboards / "BATCH_001_v1.png").write_bytes(b"x" * 2048)
    path = scene_path(project, EP, BATCH)
    save_scene(_wall_scene(artifact), path)
    before = path.read_bytes()

    from recoil.pipeline._lib import derivation_manifest
    from recoil.pipeline.cli import generate as gen

    monkeypatch.setattr(gen, "_compute_board_source_sha256", lambda *a, **k: "d" * 64)

    def _raise_after_body_save(project_arg, episode_arg, beat, **_kwargs):  # noqa: ANN001
        assert project_arg == project
        assert episode_arg == 1
        assert load_scene_active(project, EP, BATCH).beats[0].board["status"] == "approved"
        raise RuntimeError("stamp failed after body save")

    monkeypatch.setattr(gen, "_stamp_board_ssot", _raise_after_body_save)

    with pytest.raises(RuntimeError, match="stamp failed after body save"):
        gen._run_board_decision(
            project=project,
            episode=1,
            batch=SELECTOR,
            decision="approve",
            reason="ok",
            route=None,
        )

    assert path.read_bytes() == before
    assert derivation_manifest.load(project, 1)["execution"]["boards"] == {}


# ── 5. CLI revalidate reads the active body ──────────────────────────────────────
def test_cli_revalidate_and_finish_target_active_body(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    # v1: grouping has no derivable shotset_hash; v2: grouping carries one.
    v1 = _scene([_r2v_beat("V1", grouping={"strategy": "continuity", "ordinal": 1})])
    v2 = _scene([_r2v_beat("V2", grouping={"strategy": "continuity", "ordinal": 1,
                                            "shotset_hash": "abc123"})])
    store = _make_versioned(project, v1, v2)

    from recoil.pipeline.cli import generate as gen

    # active = v1 → no derivable shotset_hash (reads v1's beat)
    _code, result = gen._run_revalidate_board(project=project, episode=1, batch=SELECTOR)
    assert result["error"] == "board_shotset_hash_missing"

    store.conform(BATCH, 2)
    # active = v2 → shotset_hash present, so it advances to the SSOT lookup (reads v2)
    _code, result = gen._run_revalidate_board(project=project, episode=1, batch=SELECTOR)
    assert result["error"] == "no_prior_approval"


# ── 6. runner in-place status save targets the active body ───────────────────────
def test_runner_status_write_targets_active_body(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    _make_versioned(project, _scene([Beat("B0")]),
                    _scene([Beat("B0")]), conform_to=2)
    v1_bytes = scene_path(project, EP, BATCH).read_bytes()

    from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner

    runner = EpisodeRunner(project=project, plan={}, episode=EP)
    scene = load_scene_active(project, EP, BATCH)
    scene.beats[0].new_take(workflow=_wf("status"))  # a take STATUS change
    runner._persist_active_status(scene, expected_version=2)

    assert len(load_scene_active(project, EP, BATCH).beats[0].takes) == 1  # v2 mutated
    assert scene_path(project, EP, BATCH).read_bytes() == v1_bytes        # v1 untouched
    assert load_manifest(project, EP, BATCH)["active_version"] == 2


# ── 7. in-place status write via active_scene_body_path targets v2 ───────────────
def test_inplace_status_write_targets_active_body(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    _make_versioned(project, _scene([_r2v_beat("B0")]),
                    _scene([_r2v_beat("B0")]), conform_to=2)
    v1_bytes = scene_path(project, EP, BATCH).read_bytes()
    v2_path = active_scene_body_path(project, EP, BATCH)
    assert v2_path == scene_version_path(project, EP, BATCH, 2)  # active = the v2 body

    save_active_scene_status(
        project, EP, BATCH, expected_version=2,
        mutate=lambda s: s.beats[0].set_board_proposed(
            artifact="prep/ep_001/storyboards/BATCH_001_v2.png",
            source_sha256="a" * 64, fingerprint_version=1),
    )
    assert load_scene(v2_path).beats[0].board["status"] == "proposed"   # v2 mutated
    assert scene_path(project, EP, BATCH).read_bytes() == v1_bytes      # v1 untouched


# ── 8. flat (un-versioned) batch behaves byte-identically ────────────────────────
def test_flat_batch_unchanged(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    save_scene(_scene([_r2v_beat("B0", board=_board("prep/ep_001/storyboards/BATCH_001_v1.png"))]),
               scene_path(project, EP, BATCH))
    assert not load_manifest(project, EP, BATCH)  # no manifest

    # get_board reads the flat body exactly as before
    assert readmodel.get_board(SELECTOR, project).board_artifact == (
        "prep/ep_001/storyboards/BATCH_001_v1.png"
    )
    # an in-place status write (expected_version=1) targets the flat file itself
    save_active_scene_status(
        project, EP, BATCH, expected_version=1,
        mutate=lambda s: s.beats[0].new_take(workflow=_wf("t")),
    )
    assert load_scene(scene_path(project, EP, BATCH)).beats[0].takes  # the flat body mutated
    assert not load_manifest(project, EP, BATCH)  # still no manifest (no flag-day)


# ── 8b. init_scenes existing-versioned re-stamp targets the active body ──────────
def test_init_scenes_existing_versioned_targets_active_body(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    _make_versioned(project, _scene([Beat("B0", max_takes=3)]),
                    _scene([Beat("B0", max_takes=3)]), conform_to=2)
    v1_bytes = scene_path(project, EP, BATCH).read_bytes()

    plan = {"sequences": {BATCH: {"shots": [{"shot_id": "EP001_SH01"}]}}}
    init_scenes_from_plan(project, EP, plan, max_takes=5)

    active = load_scene_active(project, EP, BATCH)
    assert all(b.max_takes == 5 for b in active.beats)              # v2 re-stamped
    assert scene_path(project, EP, BATCH).read_bytes() == v1_bytes  # v1 byte-untouched
    manifest = load_manifest(project, EP, BATCH)
    assert manifest["active_version"] == 2
    assert len(manifest["versions"]) == 2                           # no new version


# ── 9. ANTI-BYPASS gate: no unallowlisted raw scene calls / dir scanners ─────────
_RECOIL_ROOT = Path(__file__).resolve().parents[3]

_RAW_SCENE_ALLOWLIST = {
    # canonical owners with reasons:
    "recoil/pipeline/core/persistence.py",          # sole home of raw save_scene + list_scenes
    "recoil/pipeline/core/scene_version_store.py",  # the manifest owner
    "recoil/pipeline/orchestrator/batch_selector.py",  # selector→scene_id identity (flat by design)
    "recoil/pipeline/tools/honor_rate_probe.py",    # non-production diagnostic tool
    "recoil/api/adapters/beats.py",                 # unrelated synthetic list_scenes symbol
}
# bare CALL of a raw body primitive — NOT load_scene_active / save_active_scene_status /
# active_scene_body_path / scene_manifest_path / scene_version_path / manifest_artifact_path.
_RAW_CALL = re.compile(r"\b(scene_path|load_scene|save_scene)\s*\(")
_DEF = re.compile(r"\bdef\s+(scene_path|load_scene|save_scene|list_scenes|save_active_scene_status)\b")
_DIR_VAR_FROM_SCENES = re.compile(r"(\w+)\s*=\s*[^\n]*orchestration_scenes_dir")
_SCAN_CALL = re.compile(r"\.\s*(glob|iterdir|rglob)\s*\(")


def _production_py_files():
    for p in sorted(_RECOIL_ROOT.rglob("*.py")):
        rel = p.relative_to(_RECOIL_ROOT.parent).as_posix()
        if "/tests/" in rel or p.name.startswith("test_"):
            continue
        yield rel, p


def test_no_unallowlisted_raw_scene_calls():
    """Every production reader/writer routes through the pointer — no bare scene_path/
    load_scene/save_scene CALL and no raw orchestration_scenes_dir glob outside the
    sanctioned owners. (list_scenes is the sanctioned manifest-aware listing; routed
    callers use load_scene_active / save_active_scene_status / active_scene_body_path /
    SceneVersionStore / list_scenes.)"""
    raw_hits: list[str] = []
    scan_hits: list[str] = []
    for rel, p in _production_py_files():
        if rel in _RAW_SCENE_ALLOWLIST:
            continue
        text = p.read_text(encoding="utf-8")
        dir_vars: set[str] = set()
        for i, line in enumerate(text.splitlines(), start=1):
            code = line.split("#", 1)[0]
            if _RAW_CALL.search(code) and not _DEF.search(code):
                raw_hits.append(f"{rel}:{i}: {line.strip()}")
            m = _DIR_VAR_FROM_SCENES.search(code)
            if m:
                dir_vars.add(m.group(1))
            if re.search(r"orchestration_scenes_dir\s*\.\s*(glob|iterdir|rglob)\s*\(", code):
                scan_hits.append(f"{rel}:{i}: {line.strip()}")
            for v in dir_vars:
                if re.search(rf"\b{re.escape(v)}\s*\.\s*(glob|iterdir|rglob)\s*\(", code):
                    scan_hits.append(f"{rel}:{i}: {line.strip()}")
    assert not raw_hits, "unallowlisted raw scene-body calls:\n" + "\n".join(raw_hits)
    assert not scan_hits, "raw orchestration_scenes_dir scanners:\n" + "\n".join(scan_hits)


# ── 9b. structure-immutability is an ENFORCED save-time invariant ────────────────
def test_status_save_rejects_structural_mutation(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    _make_versioned(project, _scene([Beat("B0", beat_metadata={"shot": {"id": "s0"}})]),
                    _scene([Beat("B0", beat_metadata={"shot": {"id": "s0"}})]), conform_to=2)
    v2_path = scene_version_path(project, EP, BATCH, 2)
    before = v2_path.read_bytes()

    def _structural(scene):
        scene.beats[0].beat_metadata["shot"] = {"id": "MUTATED"}  # a STRUCTURAL change

    with pytest.raises(SceneStructureImmutableError):
        save_active_scene_status(project, EP, BATCH, expected_version=2, mutate=_structural)
    assert v2_path.read_bytes() == before  # byte-unchanged: write NOTHING on a delta

    # a pure STATUS mutation through the same API succeeds + persists
    save_active_scene_status(project, EP, BATCH, expected_version=2,
                             mutate=lambda s: s.beats[0].new_take(workflow=_wf("ok")))
    assert load_scene(v2_path).beats[0].takes


# ── 9d. active-version recheck closes the conform-during-in-flight TOCTOU ─────────
def test_status_save_rejects_stale_version_after_conform(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    # v2 has the SAME structure as v1 (fingerprints coincide) — only the VERSION differs.
    store = _make_versioned(project, _scene([Beat("B0")]), _scene([Beat("B0")]))
    scene_v1, loaded_version = load_scene_active_with_version(project, EP, BATCH)
    assert loaded_version == 1

    store.conform(BATCH, 2)  # an operator moves the pointer mid-flight
    v2_path = scene_version_path(project, EP, BATCH, 2)
    v2_before = v2_path.read_bytes()

    with pytest.raises(SceneVersionConflictError):
        save_active_scene_status(
            project, EP, BATCH, expected_version=1,  # the stale loaded version
            mutate=lambda s: s.beats[0].new_take(workflow=_wf("stale")),
        )
    assert v2_path.read_bytes() == v2_before  # the stale v1 Scene is NOT written into v2


# ── 9c. manifest-SSOT anti-bypass: no scene-version manifest write outside the owner ─
_MANIFEST_ALLOWLIST = {
    "recoil/pipeline/core/scene_version_store.py",  # the SOLE manifest mutator
    "recoil/pipeline/core/persistence.py",          # manifest READ/PATH primitives only
}
_ACTIVE_VERSION_ASSIGN = re.compile(r"""\[\s*["']active_version["']\s*\]\s*=[^=]""")
_MANIFEST_PATH_WRITE = re.compile(
    r"(atomic_write_json|write_text)\s*\([^\n]*scene_manifest_path"
)
_VERSIONS_JSON_WRITE = re.compile(
    r"(atomic_write_json|write_text)\s*\([^\n]*\.versions\.json"
)


def test_no_unallowlisted_raw_manifest_mutation():
    """No production code WRITES a scene-version manifest (``*.versions.json`` /
    ``scene_manifest_path`` / an ``["active_version"]=`` assignment) outside the
    sanctioned SceneVersionStore owner."""
    hits: list[str] = []
    for rel, p in _production_py_files():
        if rel in _MANIFEST_ALLOWLIST:
            continue
        text = p.read_text(encoding="utf-8")
        # track variables that trace to a scene-version manifest read
        manifest_vars: set[str] = set()
        for i, line in enumerate(text.splitlines(), start=1):
            code = line.split("#", 1)[0]
            # ONLY the scene-version persistence.load_manifest — a word boundary keeps an
            # unrelated `_load_manifest` (e.g. tools/storyboard_version.py) from matching.
            m = re.search(r"(\w+)\s*=\s*[^\n]*\bload_manifest\s*\(", code)
            if m:
                manifest_vars.add(m.group(1))
            if _MANIFEST_PATH_WRITE.search(code) or _VERSIONS_JSON_WRITE.search(code):
                hits.append(f"{rel}:{i}: {line.strip()}")
            if _ACTIVE_VERSION_ASSIGN.search(code):
                hits.append(f"{rel}:{i}: {line.strip()}")
            for v in manifest_vars:
                if re.search(rf"\b{re.escape(v)}\s*\[\s*[\"']versions[\"']\s*\]\s*\.\s*append\s*\(", code):
                    hits.append(f"{rel}:{i}: {line.strip()}")
    assert not hits, "unallowlisted scene-version manifest writes:\n" + "\n".join(hits)


# ── lost-update: two concurrent status writers both survive ──────────────────────
def test_concurrent_status_writes_no_lost_update(tmp_path, monkeypatch):
    project, *_ = _project(tmp_path, monkeypatch)
    _make_versioned(project, _scene([_r2v_beat("B0")]),
                    _scene([_r2v_beat("B0")]), conform_to=2)

    def _set_take(scene):
        scene.beats[0].new_take(workflow=_wf("take"))

    def _set_board(scene):
        scene.beats[0].set_board_proposed(
            artifact="prep/ep_001/storyboards/BATCH_001_v2.png",
            source_sha256="b" * 64, fingerprint_version=1)

    barrier = threading.Barrier(2)

    def _run(mutate):
        barrier.wait()
        save_active_scene_status(project, EP, BATCH, expected_version=2, mutate=mutate)

    threads = [threading.Thread(target=_run, args=(m,)) for m in (_set_take, _set_board)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    body = load_scene_active(project, EP, BATCH)
    assert body.beats[0].takes, "the take-status writer's update survived"
    assert body.beats[0].board is not None, "the board-status writer's update survived"
