"""Beat persistence — atomic disk serialization for CP-7 Scene/Beat/Take trees.

Module home (ADR-0004): pipeline/core/persistence.py — pipeline owns its own
serialization because the models are pipeline-specific.

Atomicity protocol (POSIX-atomic):
  1. Serialize via scene.to_dict() → json.dumps(indent=2).
  2. Write to {path}.tmp.
  3. f.flush() then os.fsync(f.fileno()) then f.close().
  4. os.replace(tmp, final) — atomic rename on POSIX.

Failure modes:
  - Disk full mid-write → .tmp partial; final untouched; next save overwrites .tmp.
  - Process killed mid-write → same (.tmp orphaned; final has last-good).
  - Stale .tmp from prior crash → unconditionally overwritten on next save.
  - Take in `running` status from prior crash → resume protocol stales-out
    after 5 min in EpisodeRunner.recover_stale_takes (NOT this module).

Schema versioning (Law 7): top-level "schema_version" field on every saved
file. Reader validates; mismatch raises KeyError. Bump on schema-breaking
field changes. Canonical constant: pipeline._lib.schema_versions.SCENE_SCHEMA_VERSION.
"""
from __future__ import annotations

import fcntl
import hashlib
import json
import os
import re
import copy
from pathlib import Path
from typing import Callable

from recoil.pipeline._lib.schema_versions import (
    SCENE_SCHEMA_VERSION as SCHEMA_VERSION,
    SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION,
)
from recoil.pipeline.core.take import Beat, Scene
from recoil.core.atomic_write import (  # Phase 20 — re-export the SSOT
    atomic_write_json,
    atomic_write_text,
    jsonl_append_locked,
)
from recoil.core.paths import ProjectPaths

__all__ = [
    "atomic_write_json",
    "atomic_write_text",
    "jsonl_append_locked",
    # plus existing scene_path, save_scene, load_scene, list_scenes,
    # init_scenes_from_plan exposed implicitly via this module
]


class SceneLockedError(Exception):
    """Raised when a scene write is blocked by a scene lock."""

    def __init__(self, scene_id: str, lock_reason: str | None) -> None:
        self.scene_id = scene_id
        self.batch_id = scene_id
        self.lock_reason = lock_reason
        super().__init__(
            f"Scene {scene_id} is locked; lock_reason={lock_reason!r}"
        )


class SceneStructureImmutableError(Exception):
    """Raised when an in-place status save would change a version body's STRUCTURE.

    A status ``mutate`` may only touch non-structural fields (take/board status,
    approval, lock); it may NEVER change beat order/ids or any structural
    ``beat_metadata`` of an existing version body. REC-231 Phase 4 enforces
    structure-immutability at SAVE time (via :func:`structure_fingerprint`), so
    the invariant is an enforced gate, not a prose assertion.
    """

    def __init__(self, batch_id: str, version: int) -> None:
        self.batch_id = batch_id
        self.version = version
        super().__init__(
            f"status save for batch {batch_id} v{version} would change the shot "
            "STRUCTURE of an immutable version body (structure_fingerprint changed)"
        )


class SceneVersionConflictError(Exception):
    """Raised when the active version moved between load and status-save (TOCTOU).

    A status save targets the version it was LOADED from, never "whatever is
    active now". If an operator conform/revert moved the pointer between the
    caller's atomic load (``load_scene_active_with_version``) and this save, the
    expected and current active versions differ and the save is refused — a stale
    Scene is never written into a newly-active version even if fingerprints
    coincide (REC-231 Phase 4).
    """

    def __init__(self, batch_id: str, expected_version: int, actual_version: int) -> None:
        self.batch_id = batch_id
        self.expected_version = expected_version
        self.actual_version = actual_version
        super().__init__(
            f"active version for batch {batch_id} moved from expected "
            f"v{expected_version} to v{actual_version} between load and save"
        )


class SceneIdentityMismatchError(Exception):
    """Raised when a re-derive's GENERATED scene_id disagrees with the REQUESTED batch.

    Versioning lives strictly INSIDE a fixed ``batch_id``: a re-derive whose generated
    ``scene.scene_id`` differs from the requested selector target is a RENUMBER (a new
    identity), never a version of the requested batch. The store HALTS and writes
    NOTHING (no body, no manifest entry, no lock side effect) — a renumber is reported,
    never auto-cascaded (REC-231 Phase 6).
    """

    def __init__(self, requested_batch_id: str, generated_scene_id: str) -> None:
        self.requested_batch_id = requested_batch_id
        self.generated_scene_id = generated_scene_id
        super().__init__(
            f"scene identity mismatch: re-derive of requested batch "
            f"{requested_batch_id!r} generated scene_id {generated_scene_id!r} — a "
            "renumber is a new identity, never a version (HALT, no write)"
        )


_EP_TOKEN_RE = re.compile(r"^ep_\d{3,}$")


def _canonical_episode_token(episode: int | str) -> str:
    """Normalize an episode identifier to the canonical scene-file token ``ep_NNN``.

    The scene-file namespace SSOT is ``ep_{NNN}_{seq}.json``. ``scene_path`` (build)
    and ``list_scenes`` (match) are the only places that touch that filename, so this
    is the single chokepoint that guarantees the namespace can never fork. A bare
    episode number — int ``1`` or str ``"1"`` — is coerced to ``"ep_001"``; this is
    the exact 2026-06-03 ``1_BATCH_*`` pollution bug, where ``EpisodeRunner`` passed
    ``self.episode`` (an int) straight through an unvalidated ``f"{episode}_…"``. An
    already-canonical token passes through; anything else raises loudly rather than
    silently writing a divergent filename.
    """
    if isinstance(episode, bool):
        raise ValueError(f"episode must be an int or 'ep_NNN', not bool {episode!r}")
    if isinstance(episode, int):
        if episode < 0:
            raise ValueError(
                f"negative episode {episode!r}: refusing to build a divergent "
                "scene path (episodes are non-negative)"
            )
        return f"ep_{episode:03d}"
    token = str(episode).strip()
    if _EP_TOKEN_RE.match(token):
        return token
    if token.isdigit():
        return f"ep_{int(token):03d}"
    raise ValueError(
        f"non-canonical episode token {episode!r}: expected an int or 'ep_NNN' "
        "(the scene-file namespace SSOT) — refusing to build a divergent scene path"
    )


def scene_path(project: str, episode: int | str, seq_id: str) -> Path:
    """Pure path computation — does not touch the filesystem.

    ``episode`` is normalized through :func:`_canonical_episode_token`, so a bare
    int/number can never silently fork the ``1_BATCH_*`` namespace again.
    """
    return (
        ProjectPaths.for_project(project).orchestration_scenes_dir
        / f"{_canonical_episode_token(episode)}_{seq_id}.json"
    )


def _json_default(o):
    """JSON encoder fallback for Scene payloads.

    Path objects appear in payloads via `payload['start_frame']` (Bug C fix,
    2026-05-19 — see recoil/pipeline/_lib/dispatch_payload.py:368). The runner
    coerces back to Path via `_to_path()` so str-on-disk is round-trip safe.
    Matches the `default=str` pattern in audit_assertions.assert_payload_json_serializable.
    """
    if isinstance(o, Path):
        return str(o)
    raise TypeError(
        f"Object of type {o.__class__.__name__} is not JSON serializable"
    )


def _serialize_scene_dict(d: dict) -> str:
    """Serialize a raw Scene dict with the canonical schema-version wrapper."""
    return json.dumps({"schema_version": SCHEMA_VERSION, **d}, indent=2, default=_json_default)


def _write_scene_dict(d: dict, path: Path) -> None:
    """Atomic write for an already-serialized raw Scene dict."""
    path.parent.mkdir(parents=True, exist_ok=True)
    body = _serialize_scene_dict(d)
    tmp = path.with_suffix(path.suffix + ".tmp")
    with open(tmp, "w", encoding="utf-8") as f:
        f.write(body)
        f.flush()
        try:
            os.fsync(f.fileno())
        except OSError:
            pass
    os.replace(tmp, path)


def _write_scene_bytes(data: bytes, path: Path) -> None:
    """Atomic restore of a previously-read Scene body."""
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(path.suffix + ".tmp")
    with open(tmp, "wb") as f:
        f.write(data)
        f.flush()
        try:
            os.fsync(f.fileno())
        except OSError:
            pass
    os.replace(tmp, path)


def _read_scene_dict(path: Path) -> dict:
    """Load a Scene body as raw JSON dict and validate schema without model hydration."""
    if not path.exists():
        raise FileNotFoundError(f"Scene JSON not found: {path}")
    raw = json.loads(path.read_text(encoding="utf-8"))
    sv = raw.pop("schema_version", None)
    if sv != SCHEMA_VERSION:
        raise KeyError(
            f"Scene JSON schema_version mismatch in {path}: "
            f"expected {SCHEMA_VERSION}, got {sv!r}"
        )
    return raw


def save_scene(scene: Scene, path: Path) -> None:
    """Atomic POSIX write. tmp → fsync → os.replace."""
    _write_scene_dict(scene.to_dict(), path)


def save_scene_guarded(scene: Scene, path: Path) -> None:
    """Save a scene unless an existing locked scene blocks replacement.

    REC-231 Phase 7: the force/locked-overwrite path (the deleted force-overwrite CLI
    flag's clobber + its ``.pre-force-*.bak`` backup of a locked body) is DELETED — no
    silent overwrite path remains. Re-derivation is now non-destructive (it appends a candidate
    version via ``SceneVersionStore``) and in-place status saves go through
    ``save_active_scene``/``save_active_scene_status``, so this guard is no longer on any
    live write path: ``scene.locked`` is inert UX metadata, not a clobber gate. Retained
    only as the lock-refusing primitive — it raises ``SceneLockedError`` rather than ever
    overwrite a locked body.
    """
    if not path.exists():
        save_scene(scene, path)
        return

    existing = load_scene(path)
    if not existing.locked:
        save_scene(scene, path)
        return

    raise SceneLockedError(existing.scene_id, existing.lock_reason)


def load_scene(path: Path) -> Scene:
    """Round-trip companion to save_scene.

    Raises:
      FileNotFoundError    — path does not exist.
      json.JSONDecodeError — malformed JSON.
      KeyError             — missing schema_version or version mismatch.
    """
    if not path.exists():
        raise FileNotFoundError(f"Scene JSON not found: {path}")
    return Scene.from_dict(_read_scene_dict(path))


# ── Scene versioning (REC-231): per-batch manifest + pointer-faithful loader ──
# A per-batch sidecar manifest `ep_NNN_BATCH_NNN.versions.json` carries
# {schema_version, batch_id, active_version, versions[]}; each version body is a
# structure-immutable `ep_NNN_BATCH_NNN.vNNN.json` (or the flat `ep_NNN_BATCH_NNN.json`
# for a materialized-legacy v1). This phase is read + schema ONLY — no manifest is ever
# written here; absence of a manifest IS the v1-active default (no flag-day migration).
# All manifest MUTATION lives in SceneVersionStore (Phase 2); persistence holds only
# path/load/parse primitives.

# Matches a versioned scene body basename suffix `.vNNN.json` (any digit width).
_VERSION_BODY_RE = re.compile(r"\.v\d+\.json$")


def scene_manifest_path(project: str, episode: int | str, batch_id: str) -> Path:
    """Pure path to a batch's versions manifest `ep_NNN_BATCH_NNN.versions.json`."""
    return (
        ProjectPaths.for_project(project).orchestration_scenes_dir
        / f"{_canonical_episode_token(episode)}_{batch_id}.versions.json"
    )


def scene_version_path(
    project: str, episode: int | str, batch_id: str, version: int
) -> Path:
    """Pure path to a versioned scene body `ep_NNN_BATCH_NNN.vNNN.json` (3-pad).

    Used by the WRITER (Phase 2) to name new candidate bodies.
    """
    return (
        ProjectPaths.for_project(project).orchestration_scenes_dir
        / f"{_canonical_episode_token(episode)}_{batch_id}.v{version:03d}.json"
    )


def active_version(manifest: dict) -> int:
    """Trivial accessor for a manifest's active-version pointer."""
    return manifest["active_version"]


def _manifest_entry_for_version(manifest: dict, version: int) -> dict:
    for entry in manifest.get("versions", []):
        if entry.get("version") == version:
            return entry
    raise ValueError(
        f"version {version} not registered in manifest for batch "
        f"{manifest.get('batch_id')!r}"
    )


def manifest_artifact_path(
    project: str, episode: int | str, batch_id: str, manifest: dict, version: int
) -> Path:
    """Resolve the ON-DISK body for a version via its recorded `artifact`.

    Honors the manifest entry's recorded basename — the flat `ep_NNN_BATCH_NNN.json`
    for a materialized-legacy v1, OR `ep_NNN_BATCH_NNN.vNNN.json` for a re-derived
    candidate — so a materialized legacy manifest can never point at a file the loader
    doesn't read. The artifact is basename-only: any path separator, `..`, or absolute
    path is rejected with ValueError (path-safety, mirroring
    readmodel._safe_project_sidecar_path).
    """
    entry = _manifest_entry_for_version(manifest, version)
    artifact = entry["artifact"]
    if (
        not isinstance(artifact, str)
        or not artifact
        or "/" in artifact
        or "\\" in artifact
        or ".." in artifact
        or Path(artifact).name != artifact
    ):
        raise ValueError(
            f"unsafe manifest artifact {artifact!r} for batch {batch_id} v{version}: "
            "must be a bare basename (no path separator or '..')"
        )
    return ProjectPaths.for_project(project).orchestration_scenes_dir / artifact


def load_manifest(project: str, episode: int | str, batch_id: str) -> dict | None:
    """Parse a batch's versions manifest, or None when the manifest file is absent.

    Validates `schema_version` (mismatch → KeyError, mirroring load_scene) AND asserts
    manifest identity (`manifest["batch_id"] == batch_id`; a copied/cross-batch manifest
    → ValueError). Manifest identity is validated on read, never trusted.
    """
    path = scene_manifest_path(project, episode, batch_id)
    if not path.exists():
        return None
    manifest = json.loads(path.read_text(encoding="utf-8"))
    sv = manifest.get("schema_version")
    if sv != SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION:
        raise KeyError(
            f"Scene versions manifest schema_version mismatch in {path}: "
            f"expected {SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION}, got {sv!r}"
        )
    if manifest.get("batch_id") != batch_id:
        raise ValueError(
            f"manifest identity mismatch in {path}: manifest batch_id "
            f"{manifest.get('batch_id')!r} != requested {batch_id!r}"
        )
    return manifest


def load_scene_active_with_version(
    project: str, episode: int | str, batch_id: str
) -> tuple[Scene, int]:
    """Atomic (Scene, active_version) read — the TOCTOU-safe load-and-version primitive.

    Reads the manifest ONCE and returns BOTH the active body Scene AND the
    `active_version` it was loaded from, so an in-place status writer can thread the
    exact loaded version into save_active_scene_status(expected_version=...) (Phase 4)
    rather than a separately-looked-up value a conform could change in between.

    Resolution ladder:
      (a) manifest present → load the active version's recorded artifact body; assert
          `scene.scene_id == batch_id` (the loaded body's identity must match the
          requested batch; mismatch → ValueError) → (scene, active_version);
      (b) else flat `scene_path(...)` exists → (load_scene(flat), 1)  (implicit v1 active);
      (c) else → FileNotFoundError.
    """
    manifest = load_manifest(project, episode, batch_id)
    if manifest is not None:
        av = active_version(manifest)
        scene = load_scene(
            manifest_artifact_path(project, episode, batch_id, manifest, av)
        )
        if scene.scene_id != batch_id:
            raise ValueError(
                f"active scene body identity mismatch for batch {batch_id} v{av}: "
                f"loaded scene_id {scene.scene_id!r}"
            )
        return scene, av
    flat = scene_path(project, episode, batch_id)
    if flat.exists():
        return load_scene(flat), 1
    raise FileNotFoundError(
        f"no scene for batch {batch_id} (episode {episode}): "
        f"no manifest and no flat {flat}"
    )


def load_scene_active(project: str, episode: int | str, batch_id: str) -> Scene:
    """Pointer-faithful scene loader (thin wrapper over the atomic primitive).

    Returns ONLY the active Scene (existing readers unchanged). No manifest is created
    on read — absence of a manifest IS the v1-active default, so a flat-file scene loads
    byte-for-byte equivalent to load_scene(scene_path(...)).
    """
    scene, _ = load_scene_active_with_version(project, episode, batch_id)
    return scene


def active_scene_body_path(project: str, episode: int | str, batch_id: str) -> Path:
    """Resolve WHERE the active scene body lives (the canonical active-body resolver).

    Manifest present → the active version's recorded artifact path; else → the flat
    `scene_path(...)` (implicit v1 active). Every in-place status writer (Phase 4) saves
    to THIS path, so no caller hand-rolls the flat-vs-versioned fallback. Pure path
    resolution: it does not create a manifest. NOTE this is the active-body PATH only —
    manifest MUTATION lives exclusively in `scene_version_store.SceneVersionStore`.
    """
    manifest = load_manifest(project, episode, batch_id)
    if manifest is not None:
        return manifest_artifact_path(
            project, episode, batch_id, manifest, active_version(manifest)
        )
    return scene_path(project, episode, batch_id)


def assert_batch_identity(requested_batch_id: str, scene: Scene) -> None:
    """PURE identity guard: a re-derive must stay INSIDE its requested batch (REC-231).

    Compares the PRE-derivation ``requested_batch_id`` (the selector target the caller
    asked to re-derive) against the POST-derivation generated ``scene.scene_id``. On a
    mismatch the re-derive has produced a DIFFERENT identity (a renumber/split) — a new
    batch, never a version of the requested one — so this raises
    :class:`SceneIdentityMismatchError` and the caller
    (:meth:`SceneVersionStore.write_scene_candidate`) HALTS before any write. No manifest
    access — pure validation/error only. (The check is non-tautological because the
    requested id is the PRE-derivation selector and ``scene.scene_id`` is the generated
    id — the runner threads the requested id, not ``scene.scene_id``.)
    """
    if scene.scene_id != requested_batch_id:
        raise SceneIdentityMismatchError(requested_batch_id, scene.scene_id)


# Structural beat_metadata keys, in canonical order — the prompt/render-determining
# fields. `prompt_directive` is refreshed from grouping (episode_runner.py:2544) and
# APPENDED to the live prompt (dispatch_payload.py:758); the beat-level `scene_id` is
# the beat's structural scene anchor. `inputs_fingerprint` is DELIBERATELY ABSENT: it is
# a VOLATILE field the live workflow recomputes every dispatch (_build_workflow_for_beat
# ~1485), so including it would make every ordinary pre-dispatch metadata refresh read as
# a "structural change" — falsely forcing a candidate append / status-save rejection on
# normal generation. A freshly-init beat carries `plan_shot`; a grouped/re-derived beat
# carries `shot`/`batch_shots` — BOTH structural, both included-when-present.
_STRUCTURAL_BEAT_METADATA_KEYS = (
    "plan_shot",
    "shot",
    "batch_shots",
    "batch_summary",
    "grouping",
    "modality",
    "generation_config",
    "element_config",
    "prompt_directive",
    "scene_id",
)

_BUILDER_VARIANT_SHOT_KEYS = {"is_env_only", "aspect_ratio", "raw"}


def canonical_shot_identity(shot_dict: dict) -> dict:
    """Shot identity for cross-builder Writer 1 guards.

    Subtractive by design: new shot fields are structural unless explicitly measured
    as builder-variant representation noise.
    """
    return {
        key: value
        for key, value in (shot_dict or {}).items()
        if key not in _BUILDER_VARIANT_SHOT_KEYS
    }


def structure_fingerprint(scene: Scene) -> str:
    """Deterministic sha256 over a scene's EXACT canonical STRUCTURAL projection (REC-231).

    The projection captures the prompt/render-determining shot structure ONLY: beat
    order, each beat's id + the structural subset of its beat_metadata (the keys in
    `_STRUCTURAL_BEAT_METADATA_KEYS`, taken in that order, included only when present),
    plus the scene_id, the ordered beat-id list, and the shot_count (`len(scene.beats)`).
    Everything volatile/derived is EXCLUDED: `inputs_fingerprint` (recomputed every
    dispatch), `created_at`/timestamps, `approved`/selection state, `primary_take_id`,
    `takes` + all take state, `board` + board pointers/artifact paths,
    `phantom_recovery_count`, `max_takes`, and `scene.locked`/`lock_*`.

    Same shot structure → identical fingerprint; an added/removed/reordered beat or a
    change to ANY included structural key → different fingerprint. A change to ANY
    excluded field leaves it UNCHANGED — that stability is load-bearing: Phase 4's
    `save_active_scene_status` compares this fingerprint to ENFORCE structure-immutability,
    so an ordinary in-place status/metadata refresh (which only touches excluded fields)
    must pass the guard, while an in-place structural rewrite must be rejected.

    Each nested structure is serialized with `sort_keys=True` so nested-dict key order is
    canonical and the hash is stable across re-derives that reorder dict keys.
    """
    return _structural_sha(
        scene.scene_id, ((b.beat_id, b.beat_metadata) for b in scene.beats)
    )


def _structural_sha(scene_id, beat_items) -> str:
    """The canonical structural sha over (scene_id, [(beat_id, beat_metadata), ...]).

    Shared by :func:`structure_fingerprint` (from a Scene) and
    :func:`_disk_structure_fingerprint` (from raw on-disk JSON), so both produce an
    IDENTICAL hash for the same structure.
    """
    beats_projection = []
    beat_ids = []
    for beat_id, metadata in beat_items:
        metadata = metadata or {}
        structural = {
            key: metadata[key]
            for key in _STRUCTURAL_BEAT_METADATA_KEYS
            if key in metadata
        }
        beats_projection.append({"beat_id": beat_id, "beat_metadata": structural})
        beat_ids.append(beat_id)
    projection = {
        "beats": beats_projection,
        "scene_id": scene_id,
        "beat_ids": beat_ids,
        "shot_count": len(beat_ids),
    }
    canonical = json.dumps(projection, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(canonical.encode("utf-8")).hexdigest()


def _project_structural_metadata_value(key: str, value):
    if key == "shot" and isinstance(value, dict):
        return canonical_shot_identity(value)
    if key == "batch_shots" and isinstance(value, list):
        return [
            canonical_shot_identity(item) if isinstance(item, dict) else item
            for item in value
        ]
    return value


def structural_mutations(disk_dict: dict, rebuild_dict: dict) -> list[str]:
    """Return raw-dict structural mutations between disk and a runner rebuild.

    Writer 1 is cross-builder: disk may be minimally persisted while the runner's
    fresh `_scene_from_group` rebuild carries builder-variant shot representation.
    This guard permits enrichment of absent structural keys and compares present
    shot/batch_shots values through `canonical_shot_identity`; present differences
    and removals fail closed.
    """
    mutations: list[str] = []
    if disk_dict.get("scene_id") != rebuild_dict.get("scene_id"):
        mutations.append("scene_id")

    disk_beats = disk_dict.get("beats") or []
    rebuild_beats = rebuild_dict.get("beats") or []
    disk_ids = [beat.get("beat_id") for beat in disk_beats]
    rebuild_ids = [beat.get("beat_id") for beat in rebuild_beats]
    if disk_ids != rebuild_ids:
        mutations.append("beat_topology")
        return mutations

    for index, (disk_beat, rebuild_beat) in enumerate(zip(disk_beats, rebuild_beats)):
        disk_md = disk_beat.get("beat_metadata") or {}
        rebuild_md = rebuild_beat.get("beat_metadata") or {}
        beat_id = disk_beat.get("beat_id", index)
        for key in _STRUCTURAL_BEAT_METADATA_KEYS:
            disk_has = key in disk_md
            rebuild_has = key in rebuild_md
            if not disk_has and rebuild_has:
                continue
            if disk_has and not rebuild_has:
                mutations.append(f"{beat_id}.{key}:removed")
                continue
            if not disk_has and not rebuild_has:
                continue
            disk_value = _project_structural_metadata_value(key, disk_md[key])
            rebuild_value = _project_structural_metadata_value(key, rebuild_md[key])
            if disk_value != rebuild_value:
                mutations.append(f"{beat_id}.{key}:changed")
    return mutations


def _stable_structural_enrichment_value(key: str, value):
    """Copy a structural value from the runner, excluding builder-variant shot fields."""
    value = copy.deepcopy(value)
    if key == "shot" and isinstance(value, dict):
        return canonical_shot_identity(value)
    if key == "batch_shots" and isinstance(value, list):
        return [
            canonical_shot_identity(item) if isinstance(item, dict) else item
            for item in value
        ]
    return value


def _writer1_overlay_scene_dict(
    disk_dict: dict,
    runner_dict: dict,
    *,
    allow_structural_enrichment: bool,
) -> dict:
    """Overlay runner-owned status onto a fresh disk body without clobbering disk structure."""
    merged = copy.deepcopy(disk_dict)
    runner_by_beat_id = {
        beat.get("beat_id"): beat for beat in (runner_dict.get("beats") or [])
    }
    for disk_beat in merged.get("beats") or []:
        beat_id = disk_beat.get("beat_id")
        runner_beat = runner_by_beat_id[beat_id]
        # Whole-array take replace is safe: Writer 1 (save_active_scene, sole caller
        # episode_runner ~:499) is the ONLY writer of `takes`. Every concurrent Writer 2
        # (save_active_scene_status) closure — board decision / reroll / photoreal finish /
        # revalidate — mutates `beat.board` ONLY, never `takes`, and this overlay starts from
        # copy.deepcopy(disk_dict) so it preserves that board. The two writers are field-disjoint
        # and serialized by the per-batch lock, so no take/status clobber is reachable. (Two
        # runners on one batch would race, but same-batch double-dispatch is not a supported
        # operation; if it ever became one, the guard belongs at the dispatch-concurrency layer.)
        disk_beat["takes"] = copy.deepcopy(runner_beat.get("takes") or [])
        disk_beat["primary_take_id"] = runner_beat.get("primary_take_id")
        disk_beat["phantom_recovery_count"] = runner_beat.get(
            "phantom_recovery_count", disk_beat.get("phantom_recovery_count", 0)
        )

        disk_md = disk_beat.setdefault("beat_metadata", {})
        runner_md = runner_beat.get("beat_metadata") or {}
        for key in ("inputs_fingerprint", "reroll_cleared_stale"):
            if key in runner_md:
                disk_md[key] = copy.deepcopy(runner_md[key])
        if allow_structural_enrichment:
            for key in _STRUCTURAL_BEAT_METADATA_KEYS:
                if key not in disk_md and key in runner_md:
                    disk_md[key] = _stable_structural_enrichment_value(key, runner_md[key])
    return merged


def _disk_structure_fingerprint(path: Path) -> str | None:
    """Structural fingerprint of an on-disk scene body from its RAW JSON — without
    deserializing takes/receipts.

    Used by :func:`save_active_scene` so the whole-scene structure guard never trips
    over an in-flight take's receipt that does not round-trip through ``load_scene``.
    Returns None when the body file is absent.
    """
    if not path.exists():
        return None
    raw = json.loads(path.read_text(encoding="utf-8"))
    beats = raw.get("beats") or []
    return _structural_sha(
        raw.get("scene_id"),
        ((b.get("beat_id"), b.get("beat_metadata")) for b in beats),
    )


def save_active_scene_status(
    project: str,
    episode: int | str,
    batch_id: str,
    *,
    expected_version: int,
    mutate: Callable[[Scene], None],
    post_write: Callable[[Scene], None] | None = None,
) -> None:
    """The single sanctioned in-place STATUS writer for the ACTIVE version body.

    Read-modify-write UNDER the per-batch ``ep_NNN_BATCH_NNN.versions.lock`` (the
    Phase 2/3 advisory lock), so recheck→write is not racy and two concurrent
    status writers cannot silently clobber each other's non-structural update:

      (1) **active-version recheck (TOCTOU):** re-read the manifest under the lock
          and assert the CURRENT ``active_version == expected_version``; a
          mismatch (an operator conform/revert moved the pointer between the
          caller's atomic load and this save) raises
          :class:`SceneVersionConflictError` and writes NOTHING — a stale
          version-loaded Scene is never written into a newly-active version.
      (2) resolve the body path for ``expected_version`` (the rechecked-equal
          version) via :func:`manifest_artifact_path`; for an un-versioned batch
          (no manifest) the active body IS the flat ``scene_path``.
      (3) **strict structure guard:** re-load that body FRESH, apply
          ``mutate(scene)`` to it, and compare :func:`structure_fingerprint`
          before/after; a STRUCTURAL delta raises
          :class:`SceneStructureImmutableError` and writes NOTHING. This applies
          to both flat and versioned existing bodies. A flat cold start with no
          body still materializes an empty scene before mutation.
      (4) only on BOTH a matching active-version AND no structural delta, write
          the mutated body via :func:`save_scene` (atomic). If ``post_write`` is
          provided, run it after that body write but before releasing this lock;
          a hook failure restores the prior body bytes before the exception
          escapes.

    ``mutate`` is a CLOSURE (not a pre-mutated Scene) so the read-modify-write
    merges onto the LATEST persisted state under the lock, never a stale snapshot
    (resolves the lost-update MAJOR). The caller MUST pass the ``expected_version``
    it obtained ATOMICALLY from :func:`load_scene_active_with_version` (never a
    separately-looked-up value, which would reopen the TOCTOU window) — ``1`` for a
    flat Case-A batch, which that primitive returns.
    """
    lock_path = scene_manifest_path(project, episode, batch_id).with_suffix(".lock")
    lock_path.parent.mkdir(parents=True, exist_ok=True)
    lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR)
    try:
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        manifest = load_manifest(project, episode, batch_id)
        if manifest is not None:
            current = active_version(manifest)
            if current != expected_version:
                raise SceneVersionConflictError(batch_id, expected_version, current)
            body_path = manifest_artifact_path(
                project, episode, batch_id, manifest, expected_version
            )
        else:
            # No manifest → implicit v1 active (flat). A non-1 expected_version
            # means the pointer materialized+moved since the caller's load.
            if expected_version != 1:
                raise SceneVersionConflictError(batch_id, expected_version, 1)
            body_path = scene_path(project, episode, batch_id)

        def _post_write_or_rollback(scene: Scene, previous_bytes: bytes | None) -> None:
            if post_write is None:
                return
            try:
                post_write(scene)
            except Exception:
                if previous_bytes is None:
                    body_path.unlink(missing_ok=True)
                else:
                    _write_scene_bytes(previous_bytes, body_path)
                raise

        if body_path.exists():
            previous_bytes = body_path.read_bytes() if post_write is not None else None
            scene = load_scene(body_path)
            before = structure_fingerprint(scene)
            mutate(scene)
            after = structure_fingerprint(scene)
            if before != after:
                raise SceneStructureImmutableError(batch_id, expected_version)
            save_scene(scene, body_path)
            _post_write_or_rollback(scene, previous_bytes)
        elif manifest is not None:
            raise FileNotFoundError(
                f"registered scene body missing for batch {batch_id} "
                f"v{expected_version}: {body_path}"
            )
        else:
            # First materialization of a brand-new batch's initial body (Case A):
            # there is no prior structure to guard, so the mutate populates a fresh
            # Scene and it is written as the implicit v1 active flat body.
            scene = Scene(scene_id=batch_id, beats=[], scene_metadata={})
            mutate(scene)
            save_scene(scene, body_path)
            _post_write_or_rollback(scene, None)
    finally:
        fcntl.flock(lock_fd, fcntl.LOCK_UN)
        os.close(lock_fd)


def save_active_scene(
    project: str,
    episode: int | str,
    batch_id: str,
    scene: Scene,
    *,
    expected_version: int,
) -> None:
    """Persist runner-owned status onto the ACTIVE version body.

    Writer 1 operates only at the raw JSON dict level: disk is read with
    ``json.loads`` and the runner scene is serialized with ``scene.to_dict()``.
    It never hydrates the body through ``Scene.from_dict`` / ``Take.from_dict``.

    Existing bodies preserve disk-owned structure and finalizer-owned fields, then
    overlay only runner-owned status (`takes`, `primary_take_id`,
    `phantom_recovery_count`, selected beat_metadata fields) plus enrichment of
    absent builder-stable structural metadata. Flat cold starts materialize the
    runner's full scene as implicit v1.
    """
    runner_raw = scene.to_dict()
    lock_path = scene_manifest_path(project, episode, batch_id).with_suffix(".lock")
    lock_path.parent.mkdir(parents=True, exist_ok=True)
    lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR)
    try:
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        manifest = load_manifest(project, episode, batch_id)
        allow_structural_enrichment = manifest is None
        if manifest is not None:
            current = active_version(manifest)
            if current != expected_version:
                raise SceneVersionConflictError(batch_id, expected_version, current)
            body_path = manifest_artifact_path(
                project, episode, batch_id, manifest, expected_version
            )
            if not body_path.exists():
                raise FileNotFoundError(
                    f"registered scene body missing for batch {batch_id} "
                    f"v{expected_version}: {body_path}"
                )
        else:
            if expected_version != 1:
                raise SceneVersionConflictError(batch_id, expected_version, 1)
            body_path = scene_path(project, episode, batch_id)
            if not body_path.exists():
                _write_scene_dict(runner_raw, body_path)
                return

        disk_raw = _read_scene_dict(body_path)
        if structural_mutations(disk_raw, runner_raw):
            raise SceneStructureImmutableError(batch_id, expected_version)
        _write_scene_dict(
            _writer1_overlay_scene_dict(
                disk_raw,
                runner_raw,
                allow_structural_enrichment=allow_structural_enrichment,
            ),
            body_path,
        )
    finally:
        fcntl.flock(lock_fd, fcntl.LOCK_UN)
        os.close(lock_fd)


def list_scenes(project: str, episode: int | str) -> list[Path]:
    """Sorted list of FLAT Scene JSONs for an episode. [] if dir absent.

    EXCLUDES the scene-versioning sidecars that live in the same dir (REC-231): the
    per-batch manifest `*.versions.json` and versioned bodies `*.vNNN.json`, so neither
    is ever mistaken for a flat scene. Returns flat active-scene identities only; the
    pointer-resolved active body is reached via load_scene_active, not by globbing.
    """
    base = ProjectPaths.for_project(project).orchestration_scenes_dir
    if not base.is_dir():
        return []
    token = _canonical_episode_token(episode)
    return sorted(p for p in base.iterdir() if p.is_file()
                  and p.name.startswith(f"{token}_") and p.suffix == ".json"
                  and not p.name.endswith(".versions.json")
                  and not _VERSION_BODY_RE.search(p.name))


def _restamp_beats_max_takes(scene: Scene, max_takes: int) -> None:
    """Re-stamp ``max_takes`` on every beat (a NON-structural field, excluded from
    ``structure_fingerprint``) — the ``init_scenes_from_plan`` existing-scene mutate."""
    for beat in scene.beats:
        beat.max_takes = max_takes


def init_scenes_from_plan(
    project: str, episode: str, plan: dict, max_takes: int = 3,
    *, persist: bool = True,
) -> list[Scene]:
    """Idempotent — creates a Scene JSON for each plan sequence that lacks one.

    plan["sequences"] is expected to be a dict mapping seq_id → seq spec
    (where seq spec has a "shots" list; each shot becomes an empty Beat).

    For sequences that already have a Scene JSON on disk, beats are re-stamped with
    `max_takes` so a CLI override (e.g. --max-takes 5) propagates to existing scenes
    on resume. ``scene.locked`` is inert UX metadata and does not block this
    non-structural restamp.

    persist=False builds the in-memory Scenes WITHOUT writing them to disk —
    used by dry-run so a cost/validation pass stays fully read-only (REC-100).

    Returns the list of Scenes reflecting on-disk state after the call.
    """
    sequences = plan.get("sequences") or {}
    result: list[Scene] = []
    for seq_id, seq_spec in sequences.items():
        path = scene_path(project, episode, seq_id)
        if path.exists():
            # REC-231 Phase 4: re-stamp the ACTIVE version body (a versioned batch
            # must not clobber v1). load_scene_active_with_version returns the
            # active body + the version it was loaded from atomically, and the
            # re-stamp persists through the structure-guarded status writer.
            scene, loaded_version = load_scene_active_with_version(
                project, episode, seq_id
            )
            _restamp_beats_max_takes(scene, max_takes)
            if persist:
                save_active_scene_status(
                    project, episode, seq_id,
                    expected_version=loaded_version,
                    mutate=lambda s: _restamp_beats_max_takes(s, max_takes),
                )
            result.append(scene)
            continue
        beats = []
        for shot in seq_spec.get("shots", []):
            shot_id = shot.get("shot_id") or shot.get("id") or f"BEAT_{len(beats):02d}"
            beats.append(Beat(beat_id=str(shot_id),
                              max_takes=max_takes,
                              beat_metadata={"scene_id": seq_id,
                                             "plan_shot": shot}))
        scene = Scene(
            scene_id=seq_id,
            beats=beats,
            scene_metadata={"episode": episode, "project": project},
        )
        if persist:
            save_scene(scene, path)
        result.append(scene)
    return result
