"""Storyboard strip builder for r2v_multi batches."""

from __future__ import annotations

import dataclasses
import hashlib
import json
import logging
import re
from pathlib import Path
from typing import Any, TypedDict

from PIL import Image

from recoil.core import paths as core_paths
from recoil.core.paths import ProjectPaths
from recoil.core.ref_resolver import resolve_character_bundle, resolve_sheet_asset
from recoil.pipeline._lib import dispatch_payload as _dispatch_payload
from recoil.pipeline._lib.dispatch_payload import composite_sheets_enabled
from recoil.pipeline._lib.board_provider import (
    board_fallback_model,
    is_board_refusal,
    select_board_model,
)
from recoil.pipeline._lib import derivation_manifest
from recoil.pipeline._lib.derivation_sha import shotset_hash
from recoil.pipeline._lib.dispatch_payload import PayloadContext
from recoil.pipeline._lib.plan_loader import CanonicalShot
from recoil.pipeline._lib.prompt_engine import get_builder, render_prop_invariant
from recoil.pipeline._lib.shot_primitive import primitive_from_payload_context
from recoil.pipeline._lib.sublocation_registry import (
    location_base_dir,
    load_location_registry,
    sublocation_ref,
    validate_ref_file,
)
from recoil.pipeline._lib.world_state_pass import derive_settings
from recoil.pipeline.core.dispatch import dispatch
from recoil.pipeline.core.dispatch_context import DispatchContext
from recoil.pipeline.core.persistence import (
    SceneVersionConflictError,
    active_version,
    load_manifest,
    load_scene_active,
    load_scene_active_with_version,
    save_active_scene_status,
)
from recoil.pipeline.core.scene_version_store import SceneVersionStore
from recoil.pipeline.core.registry import MODALITY_STORYBOARD
from recoil.pipeline.orchestrator.batch_selector import parse_batch_selector


class BoardBuilderError(RuntimeError):
    """Raised when a storyboard strip cannot be assembled before dispatch."""


# Grid geometry per slot count. 2x2 of 9:16 panels is the PROVEN board shape
# (every validated probe — S-2/S-3/B-1 and the V-D board-as-video-ref test —
# used it) and its 9:16 overall aspect matches the video, which plausibly
# helps the r2v ref channel map panels to frames. The earlier 1xN horizontal
# strip (consult d.4) was unprobed geometry — superseded 2026-06-11 (JT).
# All sizes obey the fal gpt-image-2 caps: dims mult of 16, <=3840 edge,
# <=8,294,400 px, long:short <= 3:1.
GRID_LAYOUTS = {
    # slots: (size_override, cols, rows) — panels are 9:16 within the grid
    1: ("1728x3072", 1, 1),   # single 9:16 panel (probe single-shot arm only)
    4: ("1728x3072", 2, 2),   # 864x1536 panels; 9:16 overall
    6: ("1440x3840", 2, 3),   # 720x1280 panels; ratio 2.67
}

_VERSION_RE = re.compile(r"_v(\d+)\.png$")
logger = logging.getLogger(__name__)
BOARD_FINGERPRINT_VERSION = 2
_ITERATION_QUALITIES = {"low", "medium", "high"}
_ITERATION_SIZES = {"half", "full"}
_PROP_TEXT_FIELDS = ("prompt", "setting", "source_text", "intent")


class CarrierFact(TypedDict):
    prop_id: str
    carrier: str
    description: str


def _assert_carrier_facts_present(prompt: str, carrier_facts: list[CarrierFact]) -> None:
    for fact in carrier_facts:
        expected = render_prop_invariant(
            fact["prop_id"],
            fact["carrier"],
            fact["description"],
        )
        if expected.lower() not in prompt.lower():
            raise BoardBuilderError(
                f"carrier-bound prop {fact['prop_id']!r} "
                f"(attached_to={fact['carrier']!r}): board prompt omits its full "
                "carrier invariant; refusing to dispatch a free-floating render"
            )


def next_board_version(storyboards_dir: Path, batch_id: str) -> int:
    """Return max existing ``{batch_id}_vNN.png`` version + 1."""

    versions: list[int] = []
    for path in Path(storyboards_dir).glob(f"{batch_id}_v*.png"):
        match = _VERSION_RE.search(path.name)
        if match:
            versions.append(int(match.group(1)))
    return max(versions, default=0) + 1


def compute_source_sha256(segments: list[dict], version: int = 1) -> str:
    """version=1 (default, legacy): includes prose-derived `intent`.
    version=2 (D2/L3): STRUCTURE-ONLY — drops `intent`. Same canonicalization."""
    if type(version) is not int or version not in (1, 2):
        raise ValueError(f"unknown board fingerprint version: {version!r}")
    structural = ("shot_id", "start_s", "end_s", "duration_s", "sublocation")
    keys = structural if version == 2 else structural[:4] + ("intent", "sublocation")
    slice_list = [{k: segment.get(k) for k in keys} for segment in segments]
    return hashlib.sha256(json.dumps(slice_list, sort_keys=True).encode()).hexdigest()


def board_record_to_cache(record: dict) -> dict:
    """Rebuild a Beat.board cache dict from a manifest.execution.boards[hash]
    SSOT record (L4). Maps record fields onto the Beat.board schema keys.
    Assumes a well-formed APPROVED record (the only caller path — the resolver
    only calls this after confirming status=='approved'); it does not re-validate
    field non-emptiness (Beat(board=...) construction will if needed)."""
    return {
        "status": record["status"],
        "artifact": record.get("artifact"),
        "source_sha256": record.get("source_sha256"),
        "approved_by": record.get("approved_by"),
        "updated_at": record.get("updated_at"),
        # optional passthroughs (Beat.board allows extra keys):
        "photoreal_artifact": record.get("photoreal_artifact"),
        "fingerprint_version": record.get("fingerprint_version", 1),
        "model": record.get("model"),
        "provider": record.get("provider"),
        "fallback_from": record.get("fallback_from"),
        # REC-240: the composition_ref seam round-trips through the SSOT record so a
        # version-discriminated board carries its manifest+view pointer back to the cache.
        # Absent on legacy flat records → omitted (back-compat: legacy cache unchanged).
        **(
            {"composition_ref": record["composition_ref"]}
            if record.get("composition_ref") is not None
            else {}
        ),
    }


def composition_ref_from_board(
    board: dict,
    *,
    kind: str = "CONT",
    members: list[str] | None = None,
    layout: dict | None = None,
) -> dict:
    """REC-240 PRODUCER: build a ``CompositionManifest`` from an existing board and
    project it into the ``composition_ref`` seam.

    The v1 "composition" trivially wraps today's single batch board: the rendered VIEW is the
    board's already-rendered ``artifact`` PNG (no re-composite of atom-version video — that is
    the future molecule path). ``members`` defaults to an EMPTY manifest (the trivial wrap
    asserts no atom-version pointers yet); a caller with beat-grain atom URNs may pass them
    and they are validated by the real ``CompositionManifest`` schema, making this a genuine
    producer (and ``board.composition_ref`` a derived projection of that manifest, consumed
    back through ``board_record_to_cache``/``resolve_board_for_spend``).

    Returns the ``composition_ref`` dict ``{kind, members, layout, view}`` to store ALONGSIDE
    the flat ``artifact``. Raises if ``board`` has no artifact to wrap.
    """
    artifact = board.get("artifact") if isinstance(board, dict) else None
    if not isinstance(artifact, str) or not artifact:
        raise BoardBuilderError(
            "composition_ref_from_board requires a board with a non-empty artifact"
        )
    # Construct the REAL schema object — this is the production producer of a
    # CompositionManifest (validates kind + member-URN grammar). Lazy import keeps the
    # pipeline._lib → api.schemas edge off module load (mirrors render_composition_view).
    from recoil.api.schemas.composition import CompositionManifest

    manifest = CompositionManifest(
        kind=kind, members=list(members or []), layout=dict(layout or {})
    )
    return {
        "kind": manifest.kind,
        "members": list(manifest.members),
        "layout": dict(manifest.layout),
        # v1 trivial-wrap view = the already-rendered board PNG (pointer → manifest → view).
        "view": artifact,
    }


def resolve_composition_view(composition_ref: dict) -> str:
    """REC-240 indirection: resolve a ``composition_ref`` (pointer → manifest → rendered
    view) to the view PNG path the spend gate should consume.

    For v1 the rendered view is stored on the ref (the trivially-wrapped board PNG); this is
    the single level of indirection the SYNTHESIS reserves. A future molecule path re-renders
    the view from ``members`` via ``render_composition_view`` — this resolver is the seam that
    swap lands behind, with the spend gate unchanged."""
    view = composition_ref.get("view") if isinstance(composition_ref, dict) else None
    if not isinstance(view, str) or not view:
        raise BoardBuilderError(
            "composition_ref resolves to no view: cannot resolve board through the manifest"
        )
    return view


def resolve_board_for_spend(project: str, episode: int, beat) -> "tuple[bool, dict | None]":
    """L4 read-path inversion. Returns (approved, board) for the spend/render gate.

    A board-bearing beat ALWAYS has a derivable shotset_hash (L1 stamps it). For
    such a beat the SSOT is AUTHORITATIVE — the cache can never grant an approval
    the SSOT lacks:
      - SSOT entry present + status=='approved' + not needs_revalidation
            -> approved=True; board rebuilt from the record (artifact available
               even if the cache is empty/stale after a rederive).
      - SSOT entry present but NOT approved (rejected / needs_revalidation)
            -> approved=False (BLOCKS a cache 'approved' — cannot smuggle).
      - SSOT entry MISSING but the shotset_hash IS derivable
            -> approved=False (BLOCK). This is the regroup-safety case: a beat
               whose grouping changed gets a NEW shotset_hash with no SSOT entry,
               and the rederive merge does NOT clear beat.board — so its approved
               CACHE is stale and must NOT be trusted. (CRITICAL R1 fix: do NOT
               fall back to the cache when a shotset_hash is derivable.)
    Legacy cache fallback applies ONLY to a beat with NO derivable shotset_hash
    at all (a non-grouped legacy beat) — never to a board-bearing r2v_multi beat.
    """
    grouping = (beat.beat_metadata or {}).get("grouping") or {}
    if not isinstance(grouping, dict):
        grouping = {}
    h = grouping.get("shotset_hash")
    if not h and grouping.get("shot_ids"):
        h = shotset_hash(grouping["shot_ids"])
    if h:  # board-bearing beat with a shot-set identity → SSOT is authoritative
        record = derivation_manifest.get_board(project, episode, h)
        if record is None:
            return False, beat.board          # no blessing for this shot-set → BLOCK (regroup-safe)
        # writer deferred to the rederive reconciliation pass (D2 follow-up); no live path sets this yet.
        if record.get("status") != "approved" or record.get("needs_revalidation"):
            return False, beat.board          # rejected / flagged → BLOCK
        # FRESHNESS (CRITICAL R2): an approved record must still match the beat's
        # CURRENT structure, else this resolver's gate (incl. the pre-VIDEO preflight
        # SPEND gate, which does no L3 stale check of its own) would dispatch a paid
        # render on a stale board. Recompute the version-aware source_sha256 from the
        # beat's current segments and compare to the record. FAIL-CLOSED: if recompute
        # is impossible, BLOCK (never spend on uncertainty). batch_id is not load-
        # bearing for the recompute (_primitive_from_beat reads beat_metadata.batch_shots).
        try:
            segs = list(getattr(_primitive_from_beat(project, beat.beat_id, beat),
                                "timing_segments", []) or [])
            fresh = compute_source_sha256(segs, version=int(record.get("fingerprint_version", 1)))
        except Exception:
            fresh = None
        if fresh is None or fresh != record.get("source_sha256"):
            return False, beat.board          # structurally stale → BLOCK until --revalidate-board
        # Version-discrimination (REC-231): boards[shotset_hash] is keyed by the shot-id
        # SET, which sibling versions differing only in raw/description SHARE (the deliberate
        # dedup-appends-a-candidate decision), AND their structure-only freshness sha is
        # identical — so neither the key nor the recompute above can tell two versions apart.
        # An approved record must therefore NOT bless a paid render of a DIFFERENT active
        # version's content (e.g. after a revert to a differently-prompted sibling). Refuse
        # unless the record's approval version matches the beat's CURRENT active version.
        # Fail-closed on any manifest read error (never spend on uncertainty). A record with
        # no scene_version (legacy/pre-REC-231) skips the check, preserving prior behavior.
        rec_version = record.get("scene_version")
        if rec_version is not None:
            batch_id = (beat.beat_metadata or {}).get("scene_id")
            if not batch_id:
                # A version-stamped record but no batch identity to prove which
                # version this beat is on → cannot version-discriminate → fail-closed
                # (this is a paid-render guard; never approve spend on uncertainty).
                return False, beat.board
            try:
                manifest = load_manifest(project, episode, batch_id)
                cur_version = active_version(manifest) if manifest is not None else 1
            except Exception:
                return False, beat.board
            if cur_version != rec_version:
                return False, beat.board      # board approved against another version → re-board
        # REC-240: if the version-discriminated record carries a composition_ref, the board
        # is a derived projection of a CompositionManifest — resolve the spend artifact through
        # the manifest (pointer → manifest → view). A legacy flat record has no composition_ref
        # → byte-identical behavior (artifact unchanged).
        return True, _apply_composition_indirection(board_record_to_cache(record))
    # No derivable shotset_hash (non-grouped legacy beat) → cache fallback (defensive only).
    cache = beat.board
    if not (isinstance(cache, dict) and cache.get("status") == "approved"):
        return False, cache
    return True, _apply_composition_indirection(cache)


def _apply_composition_indirection(board: dict | None) -> dict | None:
    """REC-240: when an approved board carries a ``composition_ref``, return a copy whose
    spend ``artifact`` is resolved through the manifest (pointer → manifest → view). A legacy
    flat board (no ``composition_ref``) is returned UNCHANGED — back-compat is mandatory; this
    is the live spend path. Pure read-resolution: no generation, no I/O."""
    if not isinstance(board, dict):
        return board
    composition_ref = board.get("composition_ref")
    if composition_ref is None:
        return board  # legacy flat board → byte-identical behavior
    resolved = dict(board)
    resolved["artifact"] = resolve_composition_view(composition_ref)
    return resolved


def _iteration_tier() -> tuple[str, str]:
    config = core_paths.get_pipeline_config()
    tier = config.get("storyboard_iteration")
    if tier is None:
        return "high", "full"
    if not isinstance(tier, dict):
        raise BoardBuilderError("storyboard_iteration must be a config object")

    quality = tier.get("quality")
    size = tier.get("size")
    if quality not in _ITERATION_QUALITIES:
        raise BoardBuilderError(
            "storyboard_iteration.quality must be one of low, medium, high"
        )
    if size not in _ITERATION_SIZES:
        raise BoardBuilderError("storyboard_iteration.size must be one of half, full")
    return quality, size


def _apply_iteration_size(size_override: str, tier_size: str) -> str:
    if tier_size == "full":
        return size_override
    width_text, height_text = size_override.split("x", 1)
    return f"{int(width_text) // 2}x{int(height_text) // 2}"


def preferred_board_artifact(
    board: dict | None,
    *,
    project_root: str | Path | None = None,
) -> str | None:
    """Return the board artifact video dispatch should consume."""

    if not isinstance(board, dict):
        return None
    photoreal = board.get("photoreal_artifact")
    artifact = board.get("artifact")
    if isinstance(photoreal, str) and photoreal:
        if _artifact_exists(photoreal, project_root=project_root):
            return photoreal
        logger.warning(
            "board_finish_missing photoreal_artifact=%s artifact=%s",
            photoreal,
            artifact,
        )
        if board.get("status") == "approved":
            return None
    elif board.get("status") == "approved" and isinstance(artifact, str) and artifact:
        logger.warning(
            "board_finish_missing photoreal_artifact=%s artifact=%s",
            photoreal,
            artifact,
        )
        return None
    return artifact if isinstance(artifact, str) and artifact else None


def build_and_dispatch_board(
    project: str,
    episode: int,
    batch_id: str,
    *,
    step_runner,
    board_model: str | None = None,
    dry_run: bool = False,
    propose: bool = True,
    fix_notes: list[str] | None = None,
    slots_override: int | None = None,
    expected_version: int | None = None,
) -> dict:
    """Build one storyboard strip for a persisted r2v_multi batch.

    ``slots_override`` forces a specific panel count (must be a key of
    GRID_LAYOUTS and >= the segment count). Used only by the honor-rate
    probe's single-shot arm (slots_override=1); production never passes it.
    """

    selector = parse_batch_selector(batch_id)
    if selector is None:
        raise BoardBuilderError(f"invalid batch selector: {batch_id!r}")
    if selector.episode != int(episode):
        raise BoardBuilderError(
            f"batch selector episode {selector.episode} does not match episode {episode}"
        )

    paths = ProjectPaths.for_project(project)
    ep_token = f"ep_{episode:03d}"
    # REC-231 Phase 4: read the ACTIVE version body + its version atomically; the
    # in-place board-status save below targets that exact version.
    scene, loaded_version = load_scene_active_with_version(
        project, ep_token, selector.scene_id
    )
    if expected_version is not None:
        if loaded_version != expected_version:
            raise SceneVersionConflictError(
                selector.scene_id, expected_version, loaded_version
            )
        loaded_version = expected_version
    beat = _single_r2v_multi_beat(scene)

    primitive = _primitive_from_beat(project, batch_id, beat)
    # dry-run keeps PRODUCTION PROMPT PARITY: derive_settings (a small
    # world-state text-model call, fail-soft) runs in both modes so the
    # operator reviews exactly the prompt the live run will send. The
    # dry-run zero-spend contract covers the gpt-image-2 dispatch (the
    # $0.41 image call) and all state writes — both still skipped below.
    # Fingerprint the RAW source segments BEFORE setting derivation:
    # _derive_board_settings is a model call (non-deterministic), so a hash
    # taken after it can never match the decision path's recompute — every
    # board would read stale once world-state succeeds (latent until the
    # CLI-lane transport made it succeed, surfaced 2026-06-12).
    raw_segments = list(getattr(primitive, "timing_segments", []) or [])
    source_sha256 = compute_source_sha256(
        raw_segments,
        version=BOARD_FINGERPRINT_VERSION,
    )
    segments = _derive_board_settings(paths, primitive)
    n = len(segments)
    if n < 1:
        raise BoardBuilderError("batch has 0 segments; storyboard strip requires at least 1 panel")
    if n > 6:
        raise BoardBuilderError(
            f"batch has {n} segments; the storyboard grid caps at 6 panels "
            "(2x3 at the gpt-image-2 size limits) — split the batch"
        )

    if slots_override is not None:
        if slots_override not in GRID_LAYOUTS:
            raise BoardBuilderError(
                f"slots_override={slots_override} is not a valid layout "
                f"(choose from {sorted(GRID_LAYOUTS)})"
            )
        if n > slots_override:
            raise BoardBuilderError(
                f"slots_override={slots_override} but batch has {n} segments"
            )
        slots = slots_override
    else:
        slots = 4 if n <= 4 else 6
    full_size_override, grid_cols, grid_rows = GRID_LAYOUTS[slots]
    quality, tier_size = _iteration_tier()
    size_override = _apply_iteration_size(full_size_override, tier_size)
    _assert_valid_size_override(size_override)
    board_model = select_board_model(beat=beat, primitive=primitive, override=board_model)

    refs, ref_layout, identity_sidecar_refs, ref_sidecar = _collect_board_refs(
        paths,
        beat,
        primitive,
        segments,
    )
    if len(refs) > 16:
        raise BoardBuilderError(
            "gpt-image-2 reference cap is 16 after identity/sublocation/prop "
            f"refs; got {len(refs)}"
        )

    char_descs = _character_descriptions(paths, ref_layout.get("identity_refs", {}), ref_layout.get("worn_props", {}))
    prompt_builder = get_builder(board_model, "storyboard")
    scene_context = _scene_context_for_batch(paths, episode, beat)
    # REC-180: feed the per-panel derived screen-direction into the board prompt.
    # batch_shots are panel-aligned with `segments`; each carries top-level
    # spatial_data/asset_data/prompt_data under `raw`.
    _batch_shots = (beat.beat_metadata or {}).get("batch_shots") or []
    spatial_shots = [bs.get("raw") or bs for bs in _batch_shots]
    bible = _load_bible(paths)
    carrier_bible = _load_bible_strict(paths)
    carrier_facts = _carrier_facts(
        carrier_bible,
        spatial_shots,
        segments,
        shared_characters=_shared_characters(beat, primitive),
    )
    prompt = prompt_builder(
        segments,
        slots,
        char_descs,
        ref_layout,
        bool(ref_layout.get("sublocation_locked")),
        grid=(grid_cols, grid_rows),
        scene_context=scene_context,
        fix_notes=fix_notes,
        spatial_shots=spatial_shots,
        bible=bible,
        carrier_facts=carrier_facts,
    )
    _assert_carrier_facts_present(prompt, carrier_facts)

    panels = [
        {"segment_id": segment.get("shot_id"), "setting": segment.get("setting")}
        for segment in segments
    ]
    storyboards_dir = paths.episode_storyboards_dir(episode)
    version = next_board_version(storyboards_dir, batch_id)
    filename_stem = f"{batch_id}_v{version:02d}"

    shared_sublocation = ref_sidecar.get("shared_sublocation")
    sidecar_extra = {
        "kind": "storyboard",
        "batch_id": batch_id,
        "panels": panels,
        "sublocation": shared_sublocation,
        "identity_refs": identity_sidecar_refs,
        "sublocation_ref": ref_sidecar.get("shared_ref"),
        "sublocation_refs": ref_sidecar.get("sublocation_refs", []),
        "prop_refs": ref_sidecar.get("prop_refs", []),
        # REC-213 C4: persist worn-prop → carrier so the photoreal finish re-emits
        # the worn prose (the standalone ref is suppressed; without this the finish
        # would lose the prose — the C3 finish-sidecar lesson).
        "worn_props": ref_layout.get("worn_props", {}),
        "source_sha256": source_sha256,
        "fingerprint_version": BOARD_FINGERPRINT_VERSION,
        "model": board_model,
        "provider": board_model,
        "fallback_from": None,
        "strategy": "director_arm_a",
        "prompt": prompt,
        "version": version,
        # REC-231 Phase 5: the ACTIVE scene version this board was derived against
        # (parallel to the runner's global_provenance scene_version stamp). Additive —
        # distinct from "version" (the BOARD strip version) above.
        "scene_version": loaded_version,
        "iteration_tier": {
            "quality": quality,
            "size": tier_size,
            "size_override": size_override,
        },
    }
    payload = {
        "shot_id": batch_id,
        "prompt": prompt,
        "model": board_model,
        "quality": quality,
        "aspect_ratio": "1:1",
        "size_override": size_override,
        "reference_images": refs,
        "save_dir": str(storyboards_dir),
        "filename_stem": filename_stem,
        "sidecar_extra": sidecar_extra,
        "inputs_snapshot": {
            "batch_id": batch_id,
            "scene_id": selector.scene_id,
            "source_sha256": source_sha256,
            "segments": [
                {
                    "shot_id": segment.get("shot_id"),
                    "start_s": segment.get("start_s"),
                    "end_s": segment.get("end_s"),
                    "duration_s": segment.get("duration_s"),
                    "intent": segment.get("intent"),
                    "sublocation": segment.get("sublocation"),
                }
                for segment in segments
            ],
        },
    }

    if dry_run:
        return {
            "prompt": prompt,
            "refs": refs,
            "size_override": size_override,
            "panels": panels,
            "estimated_cost_usd": 0.41,
            "note": "dry-run: zero image dispatch + zero state writes + zero story-gate judge calls; includes the small world-state text call for prompt parity",
        }

    from recoil.pipeline._lib.story_gate import (
        StoryGate,
        StoryGateJudgeUnavailable,
        StoryGatePacket,
        judge_unavailable_verdict,
        story_gate_mode,
        verdict_summary,
        write_verdict,
    )

    mode = story_gate_mode(project)
    if mode == "enforce":
        raise NotImplementedError("story gate enforce ships in v1.1")

    text_stageability_result: dict | None = None
    gate = StoryGate(mode) if mode == "shadow" else None
    if gate is not None:
        packet = StoryGatePacket(
            board_id=filename_stem,
            board_png=storyboards_dir / f"{filename_stem}.png",
            grid_cols=grid_cols,
            grid_rows=grid_rows,
            slots=slots,
            generation_prompt=prompt,
            beats_text=_authoring_text_from_prompt(prompt),
            scene_context=scene_context,
            panels=panels,
            source_sha256=source_sha256,
            character_descriptions=char_descs,
        )
        try:
            text_verdict = gate.evaluate_text(packet)
            text_stageability_result = text_verdict.get("text_stageability")
        except StoryGateJudgeUnavailable as exc:
            text_stageability_result = {
                "route": "judge_unavailable",
                "reason": exc.reason,
            }
            logger.warning("story gate text judge unavailable for %s: %s", filename_stem, exc.reason)
        except Exception as exc:  # noqa: BLE001 - shadow gate must never block generation
            text_stageability_result = {
                "route": "judge_unavailable",
                "reason": repr(exc),
            }
            logger.exception("story gate text pass failed for %s", filename_stem)

    # Recheck immediately before paid dispatch. A microscopic window remains
    # after this read; closing it requires a cross-dispatch lease (out of scope).
    current_manifest = load_manifest(project, ep_token, selector.scene_id)
    current_version = (
        active_version(current_manifest) if current_manifest is not None else 1
    )
    if current_version != loaded_version:
        raise SceneVersionConflictError(
            selector.scene_id, loaded_version, current_version
        )

    receipt = dispatch(
        MODALITY_STORYBOARD,
        payload,
        context=DispatchContext(
            caller_id="board_builder",
            step_runner=step_runner,
            project=project,
            episode=episode,
        ),
    )
    run_result = receipt.run_result
    if is_board_refusal(run_result):
        primary = board_model
        board_model = board_fallback_model(board_model)
        if board_model != primary:
            prompt_builder = get_builder(board_model, "storyboard")
            prompt = prompt_builder(
                segments,
                slots,
                char_descs,
                ref_layout,
                bool(ref_layout.get("sublocation_locked")),
                grid=(grid_cols, grid_rows),
                scene_context=scene_context,
                fix_notes=fix_notes,
                spatial_shots=spatial_shots,
                bible=bible,
                carrier_facts=carrier_facts,
            )
            _assert_carrier_facts_present(prompt, carrier_facts)
            sidecar_extra["model"] = sidecar_extra["provider"] = board_model
            sidecar_extra["fallback_from"] = primary
            sidecar_extra["prompt"] = prompt
            payload["model"] = board_model
            payload["prompt"] = prompt
            payload["sidecar_extra"] = sidecar_extra
            # Recheck immediately before paid dispatch. A microscopic window remains
            # after this read; closing it requires a cross-dispatch lease (out of scope).
            current_manifest = load_manifest(project, ep_token, selector.scene_id)
            current_version = (
                active_version(current_manifest)
                if current_manifest is not None else 1
            )
            if current_version != loaded_version:
                raise SceneVersionConflictError(
                    selector.scene_id, loaded_version, current_version
                )
            receipt = dispatch(
                MODALITY_STORYBOARD,
                payload,
                context=DispatchContext(
                    caller_id="board_builder",
                    step_runner=step_runner,
                    project=project,
                    episode=episode,
                ),
            )
            run_result = receipt.run_result
    if not run_result.success:
        return {
            "success": False,
            "error": run_result.error or "storyboard dispatch failed",
            "receipt": receipt.to_dict(),
        }

    story_gate_result: dict | None = None
    if gate is not None:
        png_path = storyboards_dir / f"{filename_stem}.png"
        try:
            try:
                packet = StoryGatePacket.from_sidecar(png_path)
                verdict = gate.evaluate_board(packet)
                verdict["text_stageability"] = text_stageability_result
            except (StoryGateJudgeUnavailable, ValueError) as exc:
                reason = exc.reason if isinstance(exc, StoryGateJudgeUnavailable) else str(exc)
                logger.warning(
                    "story gate board judge unavailable for %s: %s",
                    filename_stem,
                    reason,
                )
                verdict = judge_unavailable_verdict(
                    board_id=filename_stem,
                    source_sha256=source_sha256,
                    reason=reason,
                )
            except Exception as exc:  # noqa: BLE001 - shadow gate bugs still become visible verdicts
                logger.exception("story gate board pass failed for %s", filename_stem)
                verdict = judge_unavailable_verdict(
                    board_id=filename_stem,
                    source_sha256=source_sha256,
                    reason=repr(exc),
                )

            verdict_path, verdict_hash = write_verdict(storyboards_dir, filename_stem, verdict)
            verdict_rel = str(verdict_path.relative_to(paths.project_root))
            summary = verdict_summary(
                verdict,
                mode="shadow",
                verdict_path=verdict_rel,
                verdict_hash=verdict_hash,
            )
        except Exception as exc:  # noqa: BLE001 - no silent shadow pass on gate bugs
            logger.exception("story gate verdict projection failed for %s", filename_stem)
            verdict = judge_unavailable_verdict(
                board_id=filename_stem,
                source_sha256=source_sha256,
                reason=repr(exc),
            )
            verdict_path, verdict_hash = write_verdict(storyboards_dir, filename_stem, verdict)
            verdict_rel = str(verdict_path.relative_to(paths.project_root))
            summary = verdict_summary(
                verdict,
                mode="shadow",
                verdict_path=verdict_rel,
                verdict_hash=verdict_hash,
            )
        story_gate_result = {
            "route": summary["route"],
            "severity": summary["severity"],
            "verdict_path": summary["verdict_path"],
        }

    artifact = str((storyboards_dir / f"{filename_stem}.png").relative_to(paths.project_root))
    if propose:
        # REC-231 Phase 4: persist the proposed-board STATUS to the ACTIVE version
        # body via the structure-guarded writer (a board pointer is non-structural,
        # so it passes the immutability guard) — never raw save_scene.
        def _apply_board_proposed(s: Scene) -> None:
            b = _single_r2v_multi_beat(s)
            b.set_board_proposed(
                artifact=artifact,
                source_sha256=source_sha256,
                fingerprint_version=BOARD_FINGERPRINT_VERSION,
            )
            if b.board is not None:
                b.board["model"] = sidecar_extra["model"]
                b.board["provider"] = sidecar_extra["provider"]
                b.board["fallback_from"] = sidecar_extra["fallback_from"]
            if gate is not None:
                b.set_board_story_gate(summary)

        save_active_scene_status(
            project, ep_token, selector.scene_id,
            expected_version=loaded_version, mutate=_apply_board_proposed,
        )
        _assert_board_repointed(project, ep_token, selector.scene_id, artifact)
        # REC-231 Phase 5: a successful re-board of the active version clears its
        # not_derived freshness block (the sole clearer). No-op for a flat batch (no
        # manifest); for a versioned batch the active version (loaded_version) is now
        # derived, so the pre-dispatch freshness gate unblocks. mark_derived asserts
        # loaded_version is still active under the lock (a conform mid-build raises).
        SceneVersionStore(project, ep_token).mark_derived(
            selector.scene_id, loaded_version
        )
    result = {
        "success": True,
        "artifact": artifact,
        "source_sha256": source_sha256,
        "fingerprint_version": BOARD_FINGERPRINT_VERSION,
        "model": sidecar_extra["model"],
        "provider": sidecar_extra["provider"],
        "fallback_from": sidecar_extra["fallback_from"],
        "version": version,
        "receipt": receipt.to_dict(),
    }
    if story_gate_result is not None:
        result["story_gate"] = story_gate_result
        if not propose:
            result["_story_gate_summary"] = summary
    return result


def render_board_finish(
    project: str,
    episode: int,
    batch_id: str,
    *,
    step_runner,
    expected_version: int,
    board_model: str | None = None,
) -> dict:
    """Render a full-size photoreal finish for an already-approved board."""

    selector = parse_batch_selector(batch_id)
    if selector is None:
        raise BoardBuilderError(f"invalid batch selector: {batch_id!r}")
    if selector.episode != int(episode):
        raise BoardBuilderError(
            f"batch selector episode {selector.episode} does not match episode {episode}"
        )

    paths = ProjectPaths.for_project(project)
    ep_token = f"ep_{episode:03d}"
    # REC-231 Phase 4: read the ACTIVE version body via the pointer (read-only).
    scene = load_scene_active(project, ep_token, selector.scene_id)
    beat = _single_r2v_multi_beat(scene)
    approved, board = resolve_board_for_spend(project, episode, beat)
    if not approved or not isinstance(board, dict):
        raise BoardBuilderError("photoreal finish requires an approved storyboard")
    beat.board = board

    pencil_artifact = board.get("artifact")
    if not isinstance(pencil_artifact, str) or not pencil_artifact:
        raise BoardBuilderError("approved board artifact missing")
    source_sha256 = board.get("source_sha256")
    if not isinstance(source_sha256, str) or not source_sha256:
        raise BoardBuilderError("approved board source_sha256 missing")

    pencil_path = paths.project_root / pencil_artifact
    if not pencil_path.is_file():
        raise BoardBuilderError(f"approved board artifact missing: {pencil_path}")
    sidecar = _load_generation_sidecar(pencil_path)
    board_artifact_version = _board_version_from_artifact_or_sidecar(
        pencil_artifact,
        sidecar,
    )

    primitive = _primitive_from_beat(project, batch_id, beat)
    raw_segments = list(getattr(primitive, "timing_segments", []) or [])
    fingerprint_version = (beat.board or {}).get("fingerprint_version", 1)
    current_sha256 = compute_source_sha256(
        raw_segments,
        version=fingerprint_version,
    )
    if current_sha256 != source_sha256:
        raise BoardBuilderError(
            "approved board source_sha256 is stale; re-run --storyboard before "
            "rendering the photoreal finish"
        )
    segments = _derive_board_settings(paths, primitive)
    n = len(segments)
    if n < 1:
        raise BoardBuilderError("batch has 0 segments; storyboard finish requires at least 1 panel")
    if n > 6:
        raise BoardBuilderError(
            f"batch has {n} segments; the storyboard grid caps at 6 panels"
        )

    slots = 4 if n <= 4 else 6
    size_override, grid_cols, grid_rows = GRID_LAYOUTS[slots]
    _assert_valid_size_override(size_override)
    board_model = select_board_model(beat=beat, primitive=primitive, override=board_model)

    copied_refs, ref_layout = _finish_refs_from_sidecar(
        paths,
        sidecar,
        beat,
        primitive,
    )
    refs = [str(pencil_path), *copied_refs]
    if len(refs) > 16:
        raise BoardBuilderError(
            "gpt-image-2 reference cap is 16 after composition/identity/"
            f"sublocation/prop refs; got {len(refs)}"
        )

    char_descs = _character_descriptions(paths, ref_layout.get("identity_refs", {}), ref_layout.get("worn_props", {}))
    prompt_builder = get_builder(board_model, "storyboard_finish")
    scene_context = _scene_context_for_batch(paths, episode, beat)
    _batch_shots = (beat.beat_metadata or {}).get("batch_shots") or []
    shots = [bs.get("raw") or bs for bs in _batch_shots]
    carrier_bible = _load_bible_strict(paths)
    carrier_facts = _carrier_facts(
        carrier_bible,
        shots,
        segments,
        shared_characters=_shared_characters(beat, primitive),
    )
    prompt = prompt_builder(
        segments,
        slots,
        char_descs,
        ref_layout,
        bool(ref_layout.get("sublocation_locked")),
        grid=(grid_cols, grid_rows),
        scene_context=scene_context,
        carrier_facts=carrier_facts,
    )
    _assert_carrier_facts_present(prompt, carrier_facts)

    storyboards_dir = paths.episode_storyboards_dir(episode)
    filename_stem = f"{batch_id}_v{board_artifact_version:02d}_photoreal"
    sidecar_extra = {
        "kind": "storyboard_finish",
        "finish_of": pencil_artifact,
        "batch_id": batch_id,
        "version": board_artifact_version,
        "source_sha256": source_sha256,
        "iteration_tier": None,
        "prompt": prompt,
        "model": board_model,
        "provider": board_model,
        "fallback_from": None,
    }
    payload = {
        "shot_id": batch_id,
        "prompt": prompt,
        "model": board_model,
        "quality": "high",
        "aspect_ratio": "1:1",
        "size_override": size_override,
        "reference_images": refs,
        "save_dir": str(storyboards_dir),
        "filename_stem": filename_stem,
        "sidecar_extra": sidecar_extra,
        "inputs_snapshot": {
            "batch_id": batch_id,
            "scene_id": selector.scene_id,
            "finish_of": pencil_artifact,
            "source_sha256": source_sha256,
            "segments": [
                {
                    "shot_id": segment.get("shot_id"),
                    "start_s": segment.get("start_s"),
                    "end_s": segment.get("end_s"),
                    "duration_s": segment.get("duration_s"),
                    "intent": segment.get("intent"),
                    "sublocation": segment.get("sublocation"),
                }
                for segment in raw_segments
            ],
        },
    }

    # Recheck immediately before paid dispatch. A microscopic window remains
    # after this read; closing it requires a cross-dispatch lease (out of scope).
    current_manifest = load_manifest(project, ep_token, selector.scene_id)
    current_version = (
        active_version(current_manifest) if current_manifest is not None else 1
    )
    if current_version != expected_version:
        raise SceneVersionConflictError(
            selector.scene_id, expected_version, current_version
        )

    receipt = dispatch(
        MODALITY_STORYBOARD,
        payload,
        context=DispatchContext(
            caller_id="board_builder",
            step_runner=step_runner,
            project=project,
            episode=episode,
        ),
    )
    run_result = receipt.run_result
    if is_board_refusal(run_result):
        primary = board_model
        board_model = board_fallback_model(board_model)
        if board_model != primary:
            prompt_builder = get_builder(board_model, "storyboard_finish")
            prompt = prompt_builder(
                segments,
                slots,
                char_descs,
                ref_layout,
                bool(ref_layout.get("sublocation_locked")),
                grid=(grid_cols, grid_rows),
                scene_context=scene_context,
                carrier_facts=carrier_facts,
            )
            _assert_carrier_facts_present(prompt, carrier_facts)
            sidecar_extra["model"] = sidecar_extra["provider"] = board_model
            sidecar_extra["fallback_from"] = primary
            sidecar_extra["prompt"] = prompt
            payload["model"] = board_model
            payload["prompt"] = prompt
            payload["sidecar_extra"] = sidecar_extra
            # Recheck immediately before paid dispatch. A microscopic window remains
            # after this read; closing it requires a cross-dispatch lease (out of scope).
            current_manifest = load_manifest(project, ep_token, selector.scene_id)
            current_version = (
                active_version(current_manifest)
                if current_manifest is not None else 1
            )
            if current_version != expected_version:
                raise SceneVersionConflictError(
                    selector.scene_id, expected_version, current_version
                )
            receipt = dispatch(
                MODALITY_STORYBOARD,
                payload,
                context=DispatchContext(
                    caller_id="board_builder",
                    step_runner=step_runner,
                    project=project,
                    episode=episode,
                ),
            )
            run_result = receipt.run_result
    if not run_result.success:
        return {
            "success": False,
            "error": run_result.error or "storyboard finish dispatch failed",
            "receipt": receipt.to_dict(),
        }

    artifact = str((storyboards_dir / f"{filename_stem}.png").relative_to(paths.project_root))
    return {
        "success": True,
        "artifact": artifact,
        "finish_of": pencil_artifact,
        "source_sha256": source_sha256,
        "model": sidecar_extra["model"],
        "provider": sidecar_extra["provider"],
        "fallback_from": sidecar_extra["fallback_from"],
        "version": board_artifact_version,
        "receipt": receipt.to_dict(),
    }


def build_with_auto_reroll(
    project: str,
    episode: int,
    batch: str,
    *,
    step_runner,
    max_attempts: int = 3,
) -> dict:
    """Build a storyboard strip, rerolling fixable HARD board failures."""

    from recoil.pipeline._lib.story_gate import story_gate_mode

    mode = story_gate_mode(project)
    if mode == "off":
        raise BoardBuilderError(
            "auto-reroll requires story_gate_mode to be 'shadow'; "
            "story_gate_mode is off"
        )
    if mode != "shadow":
        raise BoardBuilderError(
            "auto-reroll does not support enforce mode in this slice; "
            f"story_gate_mode is {mode}"
        )
    if max_attempts < 1:
        raise BoardBuilderError("max_attempts must be >= 1")

    selector = parse_batch_selector(batch)
    if selector is None:
        raise BoardBuilderError(f"invalid batch selector: {batch!r}")
    if selector.episode != int(episode):
        raise BoardBuilderError(
            f"batch selector episode {selector.episode} does not match episode {episode}"
        )
    ep_token = f"ep_{episode:03d}"
    _, expected_version = load_scene_active_with_version(
        project, ep_token, selector.scene_id
    )

    attempts: list[dict[str, Any]] = []
    fix_notes: list[str] | None = None
    stopped_reason: str | None = None

    for attempt_number in range(1, max_attempts + 1):
        result = build_and_dispatch_board(
            project,
            episode,
            batch,
            step_runner=step_runner,
            propose=False,
            fix_notes=fix_notes,
            expected_version=expected_version,
        )
        if not result.get("success"):
            stopped_reason = "dispatch_failed"
            result = {
                **result,
                "attempts": attempt_number,
                "reroll_lineage": _reroll_lineage(attempts),
                "stopped_reason": stopped_reason,
            }
            return result

        attempt = _attempt_record(project, result, attempt_number)
        attempts.append(attempt)
        route = attempt["route"]

        if _attempt_is_clean(attempt):
            selected = attempt
            stopped_reason = None
            _finalize_reroll_board(
                project,
                episode,
                batch,
                selected,
                attempts,
                status="proposed",
                expected_version=expected_version,
            )
            return _auto_reroll_result(dict(selected["result"]), attempts, stopped_reason)

        if route == "judge_unavailable":
            selected = _best_attempt_for_abort(attempts)
            stopped_reason = "judge_unavailable"
            _finalize_reroll_board(
                project,
                episode,
                batch,
                selected,
                attempts,
                status="proposed",
                expected_version=expected_version,
            )
            return _auto_reroll_result(
                dict(selected["result"]),
                attempts,
                stopped_reason,
                stopped_attempt=attempt,
            )

        rerollable, reason = _attempt_is_rerollable(attempt)
        if not rerollable:
            selected = _best_attempt_for_rejection(attempts)
            stopped_reason = reason
            _finalize_reroll_board(
                project,
                episode,
                batch,
                selected,
                attempts,
                status="rejected",
                expected_version=expected_version,
            )
            return _auto_reroll_result(
                dict(selected["result"]),
                attempts,
                stopped_reason,
                stopped_attempt=attempt,
            )

        if attempt_number >= max_attempts:
            selected = _best_attempt_for_rejection(attempts)
            stopped_reason = "attempt_cap_reached"
            _finalize_reroll_board(
                project,
                episode,
                batch,
                selected,
                attempts,
                status="rejected",
                expected_version=expected_version,
            )
            return _auto_reroll_result(
                dict(selected["result"]),
                attempts,
                stopped_reason,
                stopped_attempt=selected,
            )

        fix_notes = _extract_fix_notes(attempt["verdict"])
        if not fix_notes:
            selected = _best_attempt_for_rejection(attempts)
            stopped_reason = "no_fix_notes"
            _finalize_reroll_board(
                project,
                episode,
                batch,
                selected,
                attempts,
                status="rejected",
                expected_version=expected_version,
            )
            return _auto_reroll_result(
                dict(selected["result"]),
                attempts,
                stopped_reason,
                stopped_attempt=attempt,
            )

    selected = _best_attempt_for_rejection(attempts)
    stopped_reason = "attempt_cap_reached"
    _finalize_reroll_board(
        project,
        episode,
        batch,
        selected,
        attempts,
        status="rejected",
        expected_version=expected_version,
    )
    return _auto_reroll_result(
        dict(selected["result"]),
        attempts,
        stopped_reason,
        stopped_attempt=selected,
    )


def _auto_reroll_result(
    result: dict,
    attempts: list[dict[str, Any]],
    stopped_reason: str | None,
    *,
    stopped_attempt: dict[str, Any] | None = None,
) -> dict:
    out = {
        key: value for key, value in result.items()
        if not str(key).startswith("_")
    }
    out["attempts"] = len(attempts)
    out["reroll_lineage"] = _reroll_lineage(attempts)
    out["stopped_reason"] = stopped_reason
    if stopped_attempt is not None:
        out["stopped_attempt"] = stopped_attempt.get("attempt")
        out["stopped_route"] = stopped_attempt.get("route")
        verdict_path = (
            (stopped_attempt.get("result") or {})
            .get("story_gate", {})
            .get("verdict_path")
        )
        if verdict_path:
            out["stopped_verdict_path"] = verdict_path
    return out


def _attempt_record(project: str, result: dict, attempt_number: int) -> dict[str, Any]:
    verdict = _load_attempt_verdict(project, result)
    hard_fails, soft_fails = _failed_counts(verdict)
    route = _effective_route(verdict)
    result_for_attempt = _result_with_effective_route(result, route)
    return {
        "attempt": attempt_number,
        "artifact": result_for_attempt.get("artifact"),
        "result": result_for_attempt,
        "verdict": verdict,
        "route": route,
        "hard_fails": hard_fails,
        "soft_fails": soft_fails,
    }


def _load_attempt_verdict(project: str, result: dict) -> dict:
    story_gate = result.get("story_gate") or {}
    verdict_path = story_gate.get("verdict_path")
    if not verdict_path:
        return {}
    paths = ProjectPaths.for_project(project)
    path = paths.project_root / verdict_path
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except Exception:  # noqa: BLE001 - malformed/missing verdict ranks as unjudged
        return {}


def _effective_route(verdict: dict) -> str | None:
    if not isinstance(verdict, dict):
        return None
    route = (verdict.get("routing") or {}).get("class")
    text_stageability = verdict.get("text_stageability")
    if (
        isinstance(text_stageability, dict)
        and text_stageability.get("route") == "judge_unavailable"
    ):
        return "judge_unavailable"
    if _has_failed_text_finding(verdict, severity="HARD"):
        if _has_failed_board_check(verdict):
            return "mixed"
        return "script_problem"
    return route


def _result_with_effective_route(result: dict, route: str | None) -> dict:
    out = dict(result)
    if not route:
        return out
    story_gate = out.get("story_gate")
    if isinstance(story_gate, dict):
        out["story_gate"] = {**story_gate, "route": route}
    summary = out.get("_story_gate_summary")
    if isinstance(summary, dict):
        out["_story_gate_summary"] = {**summary, "route": route}
    return out


def _attempt_is_clean(attempt: dict[str, Any]) -> bool:
    route = attempt.get("route")
    if int(attempt.get("hard_fails") or 0) > 0:
        return False
    if route == "ok":
        return True
    return route == "board_problem" and int(attempt.get("hard_fails") or 0) == 0


def _attempt_is_rerollable(attempt: dict[str, Any]) -> tuple[bool, str | None]:
    route = attempt.get("route")
    if route != "board_problem":
        return False, f"non_rerollable_route:{route or 'unknown'}"

    hard_failed_items = _hard_failed_board_items(attempt.get("verdict") or {})
    if not hard_failed_items:
        return False, "no_hard_failures"
    if any(item.get("fix_hint_injectable") is not True for item in hard_failed_items):
        return False, "non_injectable_hard_fail"
    return True, None


def _best_attempt_for_abort(attempts: list[dict[str, Any]]) -> dict[str, Any]:
    return _best_ranked_attempt(attempts)


def _best_attempt_for_rejection(attempts: list[dict[str, Any]]) -> dict[str, Any]:
    return _best_ranked_attempt(attempts)


def _best_ranked_attempt(attempts: list[dict[str, Any]]) -> dict[str, Any]:
    judged = [
        attempt for attempt in attempts
        if attempt.get("route") not in (None, "judge_unavailable")
    ]
    if not judged:
        return attempts[0]
    return max(
        judged,
        key=lambda attempt: (
            -int(attempt.get("hard_fails") or 0),
            -int(attempt.get("soft_fails") or 0),
            int(attempt.get("attempt") or 0),
        ),
    )


def _reroll_lineage(attempts: list[dict[str, Any]]) -> list[dict[str, Any]]:
    return [
        {
            "attempt": attempt["attempt"],
            "artifact": attempt.get("artifact"),
            "route": attempt.get("route"),
            "hard_fails": attempt.get("hard_fails", 0),
            "soft_fails": attempt.get("soft_fails", 0),
        }
        for attempt in attempts
    ]


def _finalize_reroll_board(
    project: str,
    episode: int,
    batch_id: str,
    selected: dict[str, Any],
    attempts: list[dict[str, Any]],
    *,
    status: str,
    expected_version: int,
) -> None:
    selector = parse_batch_selector(batch_id)
    if selector is None:
        raise BoardBuilderError(f"invalid batch selector: {batch_id!r}")
    paths = ProjectPaths.for_project(project)
    ep_token = f"ep_{episode:03d}"
    # REC-231 Phase 4: read the ACTIVE version body + version atomically; the
    # board-status save below targets that version via the structure-guarded writer.
    scene, loaded_version = load_scene_active_with_version(
        project, ep_token, selector.scene_id
    )
    if loaded_version != expected_version:
        raise SceneVersionConflictError(
            selector.scene_id, expected_version, loaded_version
        )
    beat = _single_r2v_multi_beat(scene)

    result = selected["result"]
    artifact = result.get("artifact")
    source_sha256 = result.get("source_sha256")
    if not artifact or not source_sha256:
        raise BoardBuilderError("selected storyboard attempt missing artifact/source_sha256")
    try:
        fp_ver = result["fingerprint_version"]
    except KeyError as exc:
        raise BoardBuilderError(
            "selected storyboard attempt missing fingerprint_version"
        ) from exc
    if status not in ("rejected", "proposed"):
        raise BoardBuilderError(f"invalid reroll final status: {status!r}")

    lineage = _reroll_lineage(attempts)
    _write_reroll_lineage_to_generation_sidecar(paths, artifact, lineage)
    story_gate_summary = result.get("_story_gate_summary")

    # Board status (proposed/rejected pointer + lineage count) is non-structural,
    # so it persists onto the active body through the structure-guarded writer.
    def _apply_reroll_board(s: Scene) -> None:
        b = _single_r2v_multi_beat(s)
        b.set_board_proposed(
            artifact=artifact,
            source_sha256=source_sha256,
            fingerprint_version=fp_ver,
        )
        if b.board is not None:
            for key in ("model", "provider", "fallback_from"):
                if key in result:
                    b.board[key] = result[key]
        if story_gate_summary is not None:
            b.set_board_story_gate(story_gate_summary)
        if status == "rejected":
            b.reject_board("auto_reroll")
        if b.board is not None:
            b.board["reroll_attempts"] = len(attempts)

    save_active_scene_status(
        project, ep_token, selector.scene_id,
        expected_version=expected_version, mutate=_apply_reroll_board,
    )
    if status == "proposed":
        _assert_board_repointed(project, ep_token, selector.scene_id, artifact)
        SceneVersionStore(project, ep_token).mark_derived(
            selector.scene_id, expected_version
        )


def _write_reroll_lineage_to_generation_sidecar(
    paths: ProjectPaths,
    artifact: str,
    lineage: list[dict[str, Any]],
) -> None:
    png_path = paths.project_root / artifact
    sidecar_path = Path(f"{png_path}.json")
    try:
        sidecar = json.loads(sidecar_path.read_text(encoding="utf-8"))
    except FileNotFoundError as exc:
        raise BoardBuilderError(
            f"selected storyboard generation sidecar missing: {sidecar_path}"
        ) from exc
    except json.JSONDecodeError as exc:
        raise BoardBuilderError(
            f"selected storyboard generation sidecar is not valid JSON: {sidecar_path}"
        ) from exc
    if not isinstance(sidecar, dict):
        raise BoardBuilderError(
            f"selected storyboard generation sidecar must be a JSON object: {sidecar_path}"
        )
    sidecar["reroll_lineage"] = lineage
    sidecar_path.parent.mkdir(parents=True, exist_ok=True)
    sidecar_path.write_text(
        json.dumps(sidecar, indent=2, ensure_ascii=False),
        encoding="utf-8",
    )


def _extract_fix_notes(verdict: dict) -> list[str]:
    notes: list[str] = []
    seen: set[str] = set()

    panels = [
        item for item in (verdict.get("panels") or [])
        if isinstance(item, dict)
    ]
    panels.sort(key=lambda item: int(item.get("index") or 0))
    for panel in panels:
        if not _item_has_failed_check(panel):
            continue
        hint = str(panel.get("fix_hint") or "").strip()
        if not hint:
            continue
        note = f"Panel {panel.get('index')}: {hint}"
        if note not in seen:
            seen.add(note)
            notes.append(note)

    transitions = [
        item for item in (verdict.get("transitions") or [])
        if isinstance(item, dict)
    ]
    transitions.sort(key=lambda item: (int(item.get("from") or 0), int(item.get("to") or 0)))
    for transition in transitions:
        if not _item_has_failed_check(transition):
            continue
        hint = str(transition.get("fix_hint") or "").strip()
        if not hint:
            continue
        note = f"Transition {transition.get('from')}->{transition.get('to')}: {hint}"
        if note not in seen:
            seen.add(note)
            notes.append(note)

    return notes


def _failed_counts(verdict: dict) -> tuple[int, int]:
    hard = 0
    soft = 0
    for severity in _failed_severities(verdict):
        if severity == "HARD":
            hard += 1
        elif severity == "SOFT":
            soft += 1
    return hard, soft


def _failed_severities(verdict: dict) -> list[str | None]:
    severities: list[str | None] = []
    text_stageability = verdict.get("text_stageability")
    if isinstance(text_stageability, dict):
        for finding in text_stageability.get("findings", []) or []:
            if isinstance(finding, dict) and finding.get("passed") is False:
                severities.append(finding.get("severity"))

    for item in _board_items(verdict):
        forced_checks = item.get("forced_checks") or {}
        if not isinstance(forced_checks, dict):
            continue
        for entry in forced_checks.values():
            if isinstance(entry, dict) and entry.get("passed") is False:
                severities.append(entry.get("severity"))
    return severities


def _has_failed_text_finding(verdict: dict, *, severity: str | None = None) -> bool:
    text_stageability = verdict.get("text_stageability")
    if not isinstance(text_stageability, dict):
        return False
    for finding in text_stageability.get("findings", []) or []:
        if not isinstance(finding, dict) or finding.get("passed") is not False:
            continue
        if severity is None or finding.get("severity") == severity:
            return True
    return False


def _has_failed_board_check(verdict: dict, *, severity: str | None = None) -> bool:
    return any(
        _item_has_failed_check(item, severity=severity)
        for item in _board_items(verdict)
    )


def _hard_failed_board_items(verdict: dict) -> list[dict]:
    return [
        item for item in _board_items(verdict)
        if _item_has_failed_check(item, severity="HARD")
    ]


def _item_has_failed_check(item: dict, *, severity: str | None = None) -> bool:
    forced_checks = item.get("forced_checks") or {}
    if not isinstance(forced_checks, dict):
        return False
    for entry in forced_checks.values():
        if not isinstance(entry, dict) or entry.get("passed") is not False:
            continue
        if severity is None or entry.get("severity") == severity:
            return True
    return False


def _board_items(verdict: dict) -> list[dict]:
    items: list[dict] = []
    for section in ("panels", "transitions"):
        for item in verdict.get(section, []) or []:
            if isinstance(item, dict):
                items.append(item)
    return items


def _single_r2v_multi_beat(scene):
    beats = [
        beat for beat in scene.beats
        if (beat.beat_metadata or {}).get("modality") == "r2v_multi"
    ]
    if len(beats) != 1:
        raise BoardBuilderError(
            f"storyboard build requires exactly one r2v_multi beat; found {len(beats)}"
        )
    return beats[0]


def _assert_board_repointed(project, ep_token, scene_id, expected_artifact):
    """REC-225: fail loud if the persisted active-scene pointer didn't advance to
    the board we just wrote.

    A board write is two non-atomic steps — the sidecar is written at dispatch,
    then ``beat.board`` is re-pointed via ``save_active_scene_status``. If that
    re-point is silently dropped (a future path that forgets it, a mutate that
    no-ops, or a wrong-namespace/version persist), the sidecar exists while the
    pointer stays stale and the drift only surfaces later at the spend gate
    (observed on live EP001: a v05 sidecar with ``beat.board`` still at v04).
    Re-reading the active scene through the canonical loader and asserting the
    pointer advanced converts that silent drift into an immediate error.
    """
    persisted = load_scene_active(project, ep_token, scene_id)
    beat = _single_r2v_multi_beat(persisted)
    got = beat.board.get("artifact") if beat.board else None
    if got != expected_artifact:
        raise BoardBuilderError(
            f"REC-225: board re-point drift for scene {scene_id!r}: persisted "
            f"beat.board.artifact={got!r}, expected {expected_artifact!r}"
        )


_CANONICAL_SHOT_FIELDS = {f.name for f in dataclasses.fields(CanonicalShot)}


def _as_canonical_shot(shot: Any) -> Any:
    """Tolerant dict->CanonicalShot rebuild for persisted beat_metadata.

    Unknown keys (older scene schemas, test fixtures) are folded into `raw`
    instead of raising, so nothing intent-bearing is lost."""
    if not isinstance(shot, dict):
        return shot
    known = {k: v for k, v in shot.items() if k in _CANONICAL_SHOT_FIELDS}
    extra = {k: v for k, v in shot.items() if k not in _CANONICAL_SHOT_FIELDS}
    raw = dict(known.get("raw") or {})
    raw.update(extra)
    known["raw"] = raw
    for f in dataclasses.fields(CanonicalShot):
        if f.name in known:
            continue
        if (
            f.default is not dataclasses.MISSING
            or f.default_factory is not dataclasses.MISSING  # type: ignore[misc]
        ):
            continue
        known[f.name] = [] if f.name == "characters" else None
    return CanonicalShot(**known)


_SCENE_CONTEXT_MAX_CHARS = 1500


def _scene_context_for_batch(paths: ProjectPaths, episode: int, beat) -> str | None:
    """Contiguous locked-script span covering the batch's shots.

    Per-segment beats carry only their own source sentence, which loses
    cross-shot causality (e.g. "Jade's boots slip. The cable sways." lives in
    no segment, yet explains why the next beat's cable-catch happens) and
    blocking state. The board model receives this span as read-only context.
    Fail-soft: any miss returns None and the board builds without it.
    """
    try:
        script_path = paths.episodes_dir / f"ep_{episode:03d}.md"
        script = script_path.read_text(encoding="utf-8")
    except Exception:  # noqa: BLE001 — context is an enhancement, never a blocker
        return None

    spans: list[tuple[int, int]] = []
    for shot in (beat.beat_metadata or {}).get("batch_shots") or []:
        raw = shot.get("raw") if isinstance(shot, dict) else getattr(shot, "raw", None)
        source_text = (raw or {}).get("source_text") if isinstance(raw, dict) else None
        if not source_text:
            continue
        # Source texts are verbatim script sentences; match on the first line
        # (dialogue blocks span multiple lines and may be reflowed).
        needle = str(source_text).strip().splitlines()[0].strip()
        if len(needle) < 12:
            continue
        idx = script.find(needle)
        if idx != -1:
            spans.append((idx, idx + len(needle)))
    if not spans:
        return None

    start = min(s0 for s0, _ in spans)
    end = max(e0 for _, e0 in spans)
    # Expand back two paragraphs so the causal lead-in is included. The first
    # iteration only snaps to the start of the paragraph containing the match,
    # so three iterations = snap + two paragraphs of lead-in.
    for _ in range(3):
        # Bound the search BEFORE the current break (start - 2), else rfind
        # re-finds the same break forever and the walk never moves.
        cut = script.rfind("\n\n", 0, max(0, start - 2))
        if cut == -1:
            start = 0
            break
        start = cut + 2
    # Expand forward to the end of the final matched paragraph.
    fwd = script.find("\n\n", end)
    end = len(script) if fwd == -1 else fwd
    context = script[start:end].strip()
    if len(context) > _SCENE_CONTEXT_MAX_CHARS:
        context = context[:_SCENE_CONTEXT_MAX_CHARS].rsplit(" ", 1)[0] + " …"
    return context or None


def _authoring_text_from_prompt(prompt: str) -> str:
    marker = "AUTHORING"
    idx = prompt.find(marker)
    return prompt[idx:] if idx >= 0 else prompt


def _primitive_from_beat(project: str, batch_id: str, beat) -> Any:
    metadata = beat.beat_metadata or {}
    batch_shots = metadata.get("batch_shots") or []
    if not batch_shots:
        raise BoardBuilderError("r2v_multi storyboard target has no batch_shots")
    # Persisted beat_metadata carries shots as plain dicts; the live dispatch
    # path rebuilds CanonicalShot first (episode_runner._build_workflow_for_beat).
    # Mirror it — _raw_mapping(dict) returns the TOP-LEVEL dict (source_text is
    # null there), so dict-shaped shots silently degrade segment intent to the
    # "shot intent" placeholder and the board beats lose the script text.
    shots_cs = [_as_canonical_shot(s) for s in batch_shots]
    shot_cs = _as_canonical_shot(metadata.get("shot") or batch_shots[0])
    ctx = PayloadContext(
        project=project,
        modality="r2v_multi",
        shot_id=batch_id,
        shot=shot_cs,
        batch_shots=shots_cs,
    )
    return primitive_from_payload_context(ctx, ref_manifest={})


def _derive_board_settings(paths: ProjectPaths, primitive: Any) -> list[dict]:
    sublocations = None
    if getattr(primitive, "location_id", None):
        try:
            registry = load_location_registry(paths, primitive.location_id)
            sublocations = (registry or {}).get("sublocations") or None
        except Exception:  # noqa: BLE001 - settings are fail-soft like dispatch_payload
            sublocations = None
    return derive_settings(
        list(getattr(primitive, "timing_segments", []) or []),
        location_id=getattr(primitive, "location_id", None),
        char_ids=list(getattr(primitive, "char_ids", []) or []),
        sublocations=sublocations,
        model=None,
    )


def _collect_board_refs(paths, beat, primitive, segments: list[dict]):
    refs: list[str] = []
    ref_layout: dict[str, Any] = {"identity_refs": {}}
    identity_sidecar_refs: list[str] = []

    for char_id in _shared_characters(beat, primitive):
        bundle = resolve_character_bundle(paths, char_id, phase=_phase_for_char(beat, char_id))
        front = bundle.view_path("front") or bundle.view_path("hero")
        profile = bundle.view_path("profile")
        if front is None or profile is None:
            # Phase-specific variants are rare on disk — fall back to base
            # refs, mirroring _collect_reference_images' documented cascade
            # (phase=... -> None -> base hero/angles).
            base_bundle = resolve_character_bundle(paths, char_id)
            front = front or base_bundle.view_path("front") or base_bundle.view_path("hero")
            profile = profile or base_bundle.view_path("profile")
        char_paths = [p for p in (front, profile) if p is not None]
        if len(char_paths) != 2:
            raise BoardBuilderError(
                f"storyboard identity refs for {char_id} require front/hero and profile"
            )
        start = len(refs) + 1
        for path in char_paths:
            refs.append(str(path))
            identity_sidecar_refs.append(_project_relative(paths, Path(path)))
        ref_layout["identity_refs"][char_id] = (start, len(refs))

    ref_sidecar = _append_sublocation_ref(paths, primitive, segments, refs, ref_layout)
    suppress_worn = _raw_suppress_worn_entries(beat)
    prop_sidecar_refs = _append_prop_refs(
        paths,
        segments,
        refs,
        ref_layout,
        suppress_worn=suppress_worn,
    )
    prop_layout = ref_layout.get("prop_refs") or {}
    prop_slugs = sorted(prop_layout, key=lambda item: int(prop_layout[item]))
    ref_sidecar["prop_refs"] = [
        {"slug": slug, "ref": prop_sidecar_refs[index]}
        for index, slug in enumerate(prop_slugs)
        if index < len(prop_sidecar_refs)
    ]
    return refs, ref_layout, identity_sidecar_refs, ref_sidecar


def _append_sublocation_ref(paths, primitive, segments, refs, ref_layout) -> dict[str, Any]:
    location_id = getattr(primitive, "location_id", None)
    raw_sublocations = [segment.get("sublocation") for segment in segments]
    present = [str(s) for s in raw_sublocations if s]
    if not location_id or not present:
        return {"shared_sublocation": None, "shared_ref": None, "sublocation_refs": []}

    first = present[0]
    # Locked ONLY when EVERY segment carries the SAME sublocation — a segment
    # with no sublocation breaks the lock (spec: "when every segment shares
    # one sublocation"); mixed/missing still attach the first ref, unlocked.
    locked = len(present) == len(raw_sublocations) and all(s == first for s in present)
    distinct: list[str] = []
    seen: set[str] = set()
    for name in present:
        if name in seen:
            continue
        seen.add(name)
        distinct.append(name)

    ref_layout["sublocation_locked"] = locked
    ref_layout["sublocation_refs"] = []
    sidecar_refs: list[dict[str, str]] = []
    shared_ref: str | None = None

    # REC-213 C3: for a board LOCKED to one sublocation with composite sheets enabled,
    # prefer the location's composite SHEET (the multi-angle establishing sheet) over the
    # individual sublocation plate — the same resolve_sheet_asset route the video path uses
    # (one activation SSOT, gated by the SAME use_composite_sheets flag). Char/prop refs stay
    # individual. Absent sheet -> WARN + fall through to the plate; corrupt -> SheetIntegrityError.
    if locked and composite_sheets_enabled(paths.project):
        sheet = resolve_sheet_asset(paths, "loc", location_id)  # corrupt -> SheetIntegrityError propagates
        if sheet is not None:
            refs.append(str(sheet.path))
            all_panels = list(range(1, len(segments) + 1))
            ref_layout["sublocation_ref"] = len(refs)
            ref_layout["sublocation_panels"] = all_panels
            ref_layout["sublocation_refs"] = [
                {"slug": first, "index": len(refs), "panels": all_panels}
            ]
            return {
                "shared_sublocation": first,
                # location-relative provenance (sheet is a SIBLING of base/, not under it)
                "shared_ref": str(sheet.path.relative_to(location_base_dir(paths, location_id).parent)),
                # project-relative ref for the sidecar -> photoreal finish re-read
                "sublocation_refs": [{"slug": first, "ref": _project_relative(paths, sheet.path)}],
            }
        logger.warning(
            "board: location %s is locked to one sublocation but has NO composite sheet at %s — "
            "using the individual sublocation plate (anti-masking: a missing sheet must not silently "
            "reintroduce the wrong-location-ref class)",
            location_id, paths.sheet_path("loc", location_id),
        )

    for name in distinct[:3]:
        ref_path = sublocation_ref(paths, location_id, name)
        if ref_path is None:
            raise BoardBuilderError(
                f"no sublocation establishing ref for {location_id}/{name}"
            )
        validation_error = validate_ref_file(ref_path)
        if validation_error:
            raise BoardBuilderError(validation_error)

        refs.append(str(ref_path))
        panels = [
            i for i, segment in enumerate(segments, start=1)
            if segment.get("sublocation") == name
        ]
        ref_layout["sublocation_refs"].append(
            {"slug": name, "index": len(refs), "panels": panels}
        )
        sidecar_refs.append({"slug": name, "ref": _project_relative(paths, ref_path)})
        if locked and name == first:
            ref_layout["sublocation_ref"] = len(refs)
            ref_layout["sublocation_panels"] = panels
            shared_ref = str(ref_path.relative_to(location_base_dir(paths, location_id)))

    return {
        "shared_sublocation": first if locked else None,
        "shared_ref": shared_ref if locked else None,
        "sublocation_refs": sidecar_refs,
    }


def _worn_carrier_in_shot(attached_to, identity_refs) -> str | None:
    """REC-213 C4: resolve a prop's bible `attached_to` carrier case-insensitively
    against the shot's identity_refs keys. Returns the ACTUAL identity_refs key
    (so worn-prop prose attaches under the char-id form _character_descriptions
    uses), or None when the carrier has no prose home in this shot."""
    if not isinstance(attached_to, str) or not attached_to.strip():
        return None
    want = attached_to.strip().upper()
    for key in identity_refs or {}:
        if str(key).strip().upper() == want:
            return key
    return None


def _append_worn_permanent_ref(paths, slug, refs, ref_layout, sidecar_refs) -> bool:
    """REC-247: append a permanent worn prop's hero ref (when one exists) so the model
    can match its exact appearance — the storyboard prompt labels it worn so it stays
    on the body. Records the slug in prop_refs at its 1-based position in ``refs``.
    Returns True iff a ref was appended. Callers gate on permanence / suppress_worn /
    the auto-inject switch; this helper only resolves + records the ref.
    """
    ref_path = _prop_identity_ref(paths, slug)
    if ref_path is None:
        return False
    refs.append(str(ref_path))
    ref_layout.setdefault("prop_refs", {})[slug] = len(refs)
    sidecar_refs.append(_project_relative(paths, ref_path))
    return True


def _append_prop_refs(paths, segments, refs, ref_layout, *, suppress_worn=None) -> list[str]:
    sidecar_refs: list[str] = []
    detected = _detected_prop_slugs(paths, segments)
    auto_inject_enabled = _worn_prop_auto_inject_enabled(paths.project)
    if not detected and not auto_inject_enabled:
        return sidecar_refs

    # REC-213 C4: a prop the bible marks `attached_to` a character PRESENT in the
    # shot is WORN -> rendered as prose on the carrier (Phase 2 _character_descriptions),
    # NOT a standalone ref. Fail-LOUD bible load (worn-ness controls ref emission).
    props_bible = _load_bible_strict(paths).get("props", {})
    identity_refs = ref_layout.get("identity_refs", {})
    suppress_worn_slugs = _normalize_suppress_worn(suppress_worn, props_bible)

    ref_layout["prop_refs"] = {}
    for slug in detected:
        prop_entry = props_bible.get(slug) if isinstance(props_bible, dict) else None
        attached_to = prop_entry.get("attached_to") if isinstance(prop_entry, dict) else None
        carrier = _worn_carrier_in_shot(attached_to, identity_refs)
        if carrier is not None:
            # WORN + carrier has a prose home -> record under the carrier for the
            # prose handoff. By default (REC-219) suppress the standalone ref to
            # avoid a free-floating prop.
            worn = ref_layout.setdefault("worn_props", {}).setdefault(carrier, [])
            if slug not in worn:
                worn.append(slug)
            # REC-247: a PERMANENT-attachment worn prop with a hero ref ALSO gets the
            # ref (labeled WORN in the prompt) so the model can match a distinctive
            # readout — e.g. the debt counter — instead of inventing a garbled one each
            # panel. The "worn, never free-floating" prompt label keeps the REC-219
            # protection; the prose still emits (slug stays in worn_props). Gated on
            # the same worn-prop handling switch (auto_inject_enabled) and honors an
            # explicit suppress_worn override (no ref when the operator suppressed it).
            include_worn_ref = (
                isinstance(prop_entry, dict)
                and prop_entry.get("is_permanent_attachment") is True
                and auto_inject_enabled
                and slug not in suppress_worn_slugs
            )
            if include_worn_ref and _append_worn_permanent_ref(
                paths, slug, refs, ref_layout, sidecar_refs
            ):
                logger.info(
                    "prop_ref_worn_permanent slug=%s carrier=%s (ref INCLUDED, labeled worn)",
                    slug, carrier,
                )
                if len(sidecar_refs) >= 2:
                    break
            else:
                logger.info("prop_ref_worn slug=%s carrier=%s (prose, no standalone ref)", slug, carrier)
            continue
        # FAIL-SAFE: attached_to set but carrier not in this shot -> KEEP the ref
        # (no prose home; never vanish a prop). Non-worn props: unchanged.
        ref_path = _prop_identity_ref(paths, slug)
        if ref_path is None:
            logger.info("prop_ref_skipped slug=%s reason=no_identity_ref", slug)
            continue
        refs.append(str(ref_path))
        ref_layout["prop_refs"][slug] = len(refs)
        sidecar_refs.append(_project_relative(paths, ref_path))
        if len(sidecar_refs) >= 2:
            break
    if auto_inject_enabled:
        for slug, prop_entry in (props_bible or {}).items():
            if not isinstance(prop_entry, dict):
                continue
            if prop_entry.get("is_permanent_attachment") is not True:
                continue
            attached_to = prop_entry.get("attached_to")
            carrier = _worn_carrier_in_shot(attached_to, identity_refs)
            if carrier is None or slug in suppress_worn_slugs:
                continue
            worn = ref_layout.setdefault("worn_props", {}).setdefault(carrier, [])
            if slug in worn:
                continue
            worn.append(slug)
            # REC-247: an auto-injected permanent worn prop (one not literally named
            # in the beat text — e.g. the debt counter referenced as "the counter" /
            # "amber readout") ALSO gets its hero ref, labeled worn, so the model
            # matches its appearance instead of inventing it. Same prose+ref contract
            # as the detected path above; suppress_worn/permanent are already gated.
            # Respect the same 2-prop-ref cap as the detected loop (which may already
            # have filled it) BEFORE appending — never a 3rd prop ref.
            if len(sidecar_refs) >= 2:
                logger.info("prop_ref_worn_autoinject slug=%s carrier=%s (prose, cap reached)", slug, carrier)
            elif _append_worn_permanent_ref(paths, slug, refs, ref_layout, sidecar_refs):
                logger.info("prop_ref_worn_permanent_autoinject slug=%s carrier=%s", slug, carrier)
                if len(sidecar_refs) >= 2:
                    break
            else:
                logger.info("prop_ref_worn_autoinject slug=%s carrier=%s (prose, no ref)", slug, carrier)
    if not ref_layout.get("prop_refs"):
        ref_layout.pop("prop_refs", None)
    return sidecar_refs


def _raw_suppress_worn_entries(beat) -> list[Any]:
    entries: list[Any] = []
    for shot in (beat.beat_metadata or {}).get("batch_shots") or []:
        raw = shot.get("raw") if isinstance(shot, dict) else getattr(shot, "raw", None)
        candidates = []
        if isinstance(shot, dict):
            candidates.append(shot)
        if isinstance(raw, dict) and raw is not shot:
            candidates.append(raw)
        for source in candidates:
            value = source.get("suppress_worn")
            if value is None:
                continue
            if isinstance(value, list):
                entries.extend(value)
            else:
                entries.append(value)
    return entries


def _normalize_suppress_worn(suppress_worn, props_bible) -> set[str]:
    if not suppress_worn:
        return set()
    props = props_bible if isinstance(props_bible, dict) else {}
    by_normalized = {
        _normalize_prop_slug(slug): slug
        for slug in props
        if isinstance(slug, str) and _normalize_prop_slug(slug)
    }
    normalized: set[str] = set()
    for entry in suppress_worn:
        if not isinstance(entry, str):
            raise BoardBuilderError(f"suppress_worn: unknown prop slug {entry!r}")
        slug = by_normalized.get(_normalize_prop_slug(entry))
        if slug is None:
            raise BoardBuilderError(f"suppress_worn: unknown prop slug {entry!r}")
        normalized.add(slug)
    return normalized


def _worn_prop_auto_inject_enabled(project: str) -> bool:
    cfg = _dispatch_payload._project_config_cache.get(project)
    if cfg is None:
        try:
            cfg = _dispatch_payload.load_project_config(project) or {}
        except Exception:  # noqa: BLE001 - absent/unreadable config keeps default-on behavior
            cfg = {}
        else:
            _dispatch_payload._project_config_cache[project] = cfg
    return cfg.get("worn_prop_auto_inject", True) is not False


def _detected_prop_slugs(paths, segments) -> list[str]:
    prop_slugs = _prop_registry_slugs(paths)
    if not prop_slugs:
        return []
    seen: set[str] = set()
    detected: list[str] = []
    for segment in segments or []:
        text = _normalized_prop_text(segment)
        if not text:
            continue
        matches: list[tuple[int, int, str]] = []
        for order, slug in enumerate(prop_slugs):
            if slug in seen:
                continue
            needle = _normalize_prop_slug(slug)
            if not needle:
                continue
            pattern = r"(?<![a-z0-9])" + re.escape(needle).replace(r"\ ", r"\s+") + r"(?![a-z0-9])"
            match = re.search(pattern, text)
            if match:
                matches.append((match.start(), order, slug))
        for _pos, _order, slug in sorted(matches):
            if slug in seen:
                continue
            seen.add(slug)
            detected.append(slug)
    return detected


def _prop_registry_slugs(paths) -> list[str]:
    registry_path = paths.assets_dir / "prop" / "prop.json"
    if not registry_path.is_file():
        return []
    try:
        data = json.loads(registry_path.read_text(encoding="utf-8"))
    except Exception:  # noqa: BLE001 - malformed prop inventory should not block boards
        logger.info("prop_ref_skipped reason=invalid_registry path=%s", registry_path)
        return []
    if not isinstance(data, dict):
        return []
    return [str(slug) for slug in data if str(slug).strip()]


def _prop_identity_ref(paths, slug: str) -> Path | None:
    identity_dir = paths.pool_dir("prop", slug, "identity", look="base")
    matches = sorted(identity_dir.glob("*.png"))
    if not matches:
        return None
    return matches[0]


def _normalized_prop_text(segment) -> str:
    parts: list[str] = []
    for field in _PROP_TEXT_FIELDS:
        value = segment.get(field) if isinstance(segment, dict) else getattr(segment, field, None)
        if value:
            parts.append(str(value))
    return _normalize_prop_slug(" ".join(parts))


def _normalize_prop_slug(text: str) -> str:
    normalized = str(text).lower().replace("_", " ").replace("-", " ")
    normalized = re.sub(r"[^a-z0-9]+", " ", normalized)
    return re.sub(r"\s+", " ", normalized).strip()


def _carrier_facts(
    bible: dict,
    shots: list[dict],
    segments: list[dict],
    shared_characters: list[str] | None = None,
) -> list[CarrierFact]:
    """Carrier invariants for carrier-bound props active in this batch.

    Returns structured facts only. Prompt bytes are owned by prompt_engine.
    """
    if not bible:
        return []
    if not isinstance(bible, dict):
        raise BoardBuilderError("carrier bible must be a JSON object")
    if "props" not in bible:
        return []
    props = bible["props"]
    if not isinstance(props, dict):
        raise BoardBuilderError("carrier bible 'props' must be an object")

    structured_props: set[str] = set()
    carrier_chars: set[str] = {
        str(char).strip().upper()
        for char in (shared_characters or [])
        if str(char).strip()
    }
    for shot in shots or []:
        if not isinstance(shot, dict):
            continue
        raw_shot = shot.get("raw") or shot
        if not isinstance(raw_shot, dict):
            continue
        asset_data = raw_shot.get("asset_data") or {}
        if not isinstance(asset_data, dict):
            continue
        for prop in asset_data.get("props") or []:
            prop_id = prop.get("prop_id") if isinstance(prop, dict) else None
            if prop_id:
                structured_props.add(str(prop_id))
        for character in asset_data.get("characters") or []:
            char_id = character.get("char_id") if isinstance(character, dict) else None
            if char_id:
                carrier_chars.add(str(char_id).strip().upper())

    segment_text = " ".join(
        _normalized_prop_text(segment) for segment in (segments or [])
    ).strip()

    facts: list[CarrierFact] = []
    for prop_id, entry in props.items():
        if not isinstance(entry, dict):
            raise BoardBuilderError(f"carrier bible prop {prop_id!r} must be an object")
        if "is_permanent_attachment" in entry and not isinstance(
            entry["is_permanent_attachment"],
            bool,
        ):
            raise BoardBuilderError(
                f"carrier bible prop {prop_id!r} is_permanent_attachment must be a bool"
            )
        attached_to = entry.get("attached_to")
        if not attached_to or entry.get("is_permanent_attachment") is not True:
            continue
        carrier_key = str(attached_to).strip().upper()
        if not carrier_key:
            continue

        needle = _normalize_prop_slug(str(prop_id))
        text_active = False
        if needle and segment_text:
            pattern = (
                r"(?<![a-z0-9])"
                + re.escape(needle).replace(r"\ ", r"\s+")
                + r"(?![a-z0-9])"
            )
            text_active = re.search(pattern, segment_text) is not None
        active = (
            str(prop_id) in structured_props
            or text_active
            or carrier_key in carrier_chars
        )
        if active:
            facts.append({
                "prop_id": str(prop_id),
                "carrier": str(attached_to),
                "description": str(entry.get("description") or ""),
            })
    return facts


def _artifact_exists(
    artifact: str,
    *,
    project_root: str | Path | None = None,
) -> bool:
    path = Path(artifact).expanduser()
    if path.is_absolute():
        return path.is_file()
    if project_root is not None and (Path(project_root) / path).is_file():
        return True
    return path.is_file()


def _load_generation_sidecar(png_path: Path) -> dict[str, Any]:
    sidecar_path = Path(f"{png_path}.json")
    try:
        sidecar = json.loads(sidecar_path.read_text(encoding="utf-8"))
    except FileNotFoundError as exc:
        raise BoardBuilderError(
            f"approved storyboard generation sidecar missing: {sidecar_path}"
        ) from exc
    except json.JSONDecodeError as exc:
        raise BoardBuilderError(
            f"approved storyboard generation sidecar is not valid JSON: {sidecar_path}"
        ) from exc
    if not isinstance(sidecar, dict):
        raise BoardBuilderError(
            f"approved storyboard generation sidecar must be a JSON object: {sidecar_path}"
        )
    return sidecar


def _board_version_from_artifact_or_sidecar(artifact: str, sidecar: dict[str, Any]) -> int:
    version = sidecar.get("version")
    if isinstance(version, int) and version > 0:
        return version
    match = _VERSION_RE.search(Path(artifact).name)
    if match:
        return int(match.group(1))
    raise BoardBuilderError(f"approved board artifact has no _vNN version: {artifact}")


def _finish_refs_from_sidecar(
    paths: ProjectPaths,
    sidecar: dict[str, Any],
    beat,
    primitive,
) -> tuple[list[str], dict[str, Any]]:
    refs: list[str] = []
    ref_layout: dict[str, Any] = {"identity_refs": {}}

    identity_refs = sidecar.get("identity_refs") or []
    if not isinstance(identity_refs, list):
        raise BoardBuilderError("storyboard generation sidecar identity_refs must be a list")
    for ref in identity_refs:
        refs.append(str(_resolve_sidecar_ref(paths, ref)))

    # The finish dispatch prepends the approved pencil board as @Image1, so
    # every sidecar-copied reference is offset by one in the finish prompt.
    composition_offset = 1
    start = 1 + composition_offset
    for char_id in _shared_characters(beat, primitive):
        end = start + 1
        if end <= len(identity_refs) + composition_offset:
            ref_layout["identity_refs"][char_id] = (start, end)
        start += 2

    sublocation_entries = _sidecar_ref_entries(sidecar.get("sublocation_refs"))
    if sublocation_entries:
        ref_layout["sublocation_refs"] = []
        for entry in sublocation_entries:
            path = _resolve_sidecar_ref(paths, entry["ref"])
            refs.append(str(path))
            ref_layout["sublocation_refs"].append({
                "slug": entry.get("slug") or "sublocation",
                "index": len(refs) + composition_offset,
            })
        ref_layout["sublocation_locked"] = bool(sidecar.get("sublocation"))

    prop_entries = _sidecar_ref_entries(sidecar.get("prop_refs"))
    if prop_entries:
        ref_layout["prop_refs"] = []
        for entry in prop_entries:
            path = _resolve_sidecar_ref(paths, entry["ref"])
            refs.append(str(path))
            ref_layout["prop_refs"].append({
                "slug": entry.get("slug") or "prop",
                "index": len(refs) + composition_offset,
            })

    # REC-213 C4: restore worn-prop → carrier so the finish _character_descriptions
    # re-emits the worn prose (the suppressed standalone ref is intentionally absent).
    worn_props = sidecar.get("worn_props")
    if isinstance(worn_props, dict) and worn_props:
        ref_layout["worn_props"] = {
            str(carrier): list(slugs or [])
            for carrier, slugs in worn_props.items()
        }

    return refs, ref_layout


def _sidecar_ref_entries(value: Any) -> list[dict[str, str]]:
    if not value:
        return []
    if not isinstance(value, list):
        raise BoardBuilderError("storyboard generation sidecar ref entries must be a list")
    entries: list[dict[str, str]] = []
    for item in value:
        if isinstance(item, str):
            entries.append({"ref": item})
            continue
        if not isinstance(item, dict):
            raise BoardBuilderError("storyboard generation sidecar ref entry must be a string or object")
        ref = item.get("ref") or item.get("path") or item.get("artifact")
        if not isinstance(ref, str) or not ref:
            raise BoardBuilderError("storyboard generation sidecar ref entry missing ref")
        entry = {"ref": ref}
        slug = item.get("slug")
        if isinstance(slug, str) and slug:
            entry["slug"] = slug
        entries.append(entry)
    return entries


def _resolve_sidecar_ref(paths: ProjectPaths, ref: Any) -> Path:
    if not isinstance(ref, str) or not ref:
        raise BoardBuilderError("storyboard generation sidecar ref path must be a non-empty string")
    path = Path(ref).expanduser()
    if not path.is_absolute():
        path = paths.project_root / path
    if not path.is_file():
        raise BoardBuilderError(f"storyboard finish sidecar ref missing: {path}")
    return path


def _shared_characters(beat, primitive) -> list[str]:
    metadata = beat.beat_metadata or {}
    summary = metadata.get("batch_summary") or {}
    element_summary = ((metadata.get("element_config") or {}).get("batch_summary") or {})
    raw = (
        metadata.get("shared_characters")
        or summary.get("shared_characters")
        or element_summary.get("shared_characters")
        or getattr(primitive, "char_ids", [])
        or []
    )
    seen: set[str] = set()
    chars: list[str] = []
    for item in raw:
        cid = str(item.get("char_id") if isinstance(item, dict) else item).strip().upper()
        if cid and cid not in seen:
            seen.add(cid)
            chars.append(cid)
    return chars


def _phase_for_char(beat, char_id: str) -> str | None:
    for shot in (beat.beat_metadata or {}).get("batch_shots") or []:
        raw_chars = []
        if isinstance(shot, dict):
            raw_chars.extend((shot.get("asset_data") or {}).get("characters") or [])
            raw_chars.extend(shot.get("characters") or [])
        else:
            raw_chars.extend(getattr(shot, "characters", []) or [])
        for entry in raw_chars:
            if isinstance(entry, dict):
                cid = str(entry.get("char_id") or entry.get("id") or "").strip().upper()
                phase = entry.get("wardrobe_phase_id")
            else:
                cid = str(getattr(entry, "char_id", entry)).strip().upper()
                phase = getattr(entry, "wardrobe_phase_id", None)
            if cid == char_id and phase:
                return str(phase)
    return None


def _character_descriptions(
    paths: ProjectPaths,
    identity_refs: dict[str, Any],
    worn_props: dict[str, Any] | None = None,
) -> dict[str, str]:
    worn_props = worn_props or {}
    # REC-213 C4: when there are worn props to render, the bible CONTROLS output —
    # load it strictly (a malformed bible must not silently drop the prose for an
    # already-suppressed ref). Otherwise keep the prompt-enhancement fail-soft read.
    bible = _load_bible_strict(paths) if worn_props else _load_bible(paths)
    characters = bible.get("characters") if isinstance(bible, dict) else {}
    props_bible = bible.get("props") if isinstance(bible, dict) else {}
    out: dict[str, str] = {}
    for char_id in identity_refs:
        desc = ""
        if isinstance(characters, dict):
            entry = characters.get(char_id) or characters.get(char_id.upper())
            if isinstance(entry, dict):
                desc = str(entry.get("visual_description") or "").strip()
        base = _first_sentence(desc) or "visual identity reference"
        # Append worn-prop prose AFTER _first_sentence so it is NOT clipped.
        for slug in worn_props.get(char_id, []):
            prop_entry = props_bible.get(slug) if isinstance(props_bible, dict) else None
            prop_desc = (
                str(prop_entry.get("description") or "").strip()
                if isinstance(prop_entry, dict) else ""
            )
            state_notes = (
                str(prop_entry.get("state_notes") or "").strip()
                if isinstance(prop_entry, dict) else ""
            )
            if state_notes and state_notes not in prop_desc:
                prop_desc = f"{prop_desc}; {state_notes}" if prop_desc else state_notes
            label = str(slug).replace("_", " ")
            base = f"{base} Wears the {label}: {prop_desc}" if prop_desc else f"{base} Wears the {label}."
        out[char_id] = base
    return out


def _load_bible(paths: ProjectPaths) -> dict:
    try:
        path = paths.global_bible_path
        if path.is_file():
            return json.loads(path.read_text(encoding="utf-8"))
    except Exception:  # noqa: BLE001 - descriptions are prompt enhancement only
        return {}
    return {}


def _load_bible_strict(paths: ProjectPaths) -> dict:
    """Bible loader for the WORN-PROP gate (REC-213 C4). Unlike _load_bible
    (prompt-enhancement, fail-soft), worn-ness CONTROLS whether a prop ref is
    emitted, so a silent {} would re-emit the wrong standalone ref. Contract:
    ABSENT path -> {} (no worn props; current behavior). PRESENT but unparseable
    or shape-malformed -> raise BoardBuilderError (never leak a raw
    JSONDecodeError/TypeError/AttributeError from the gate)."""
    path = paths.global_bible_path
    if not path.is_file():
        return {}
    try:
        bible = json.loads(path.read_text(encoding="utf-8"))
    except (OSError, UnicodeDecodeError, json.JSONDecodeError) as exc:
        raise BoardBuilderError(f"global bible present but unreadable/unparseable: {exc}") from exc
    if not isinstance(bible, dict):
        raise BoardBuilderError("global bible must be a JSON object")
    # A PRESENT 'props' (even explicit null) that isn't a dict is malformed → raise.
    # Use key-presence, NOT .get() (which collapses an explicit `props: null` to None and
    # would fail-OPEN: the gate then sees no props and re-emits the standalone ref).
    if "props" in bible and not isinstance(bible["props"], dict):
        raise BoardBuilderError("global bible 'props' must be an object")
    props = bible.get("props") or {}
    for slug, entry in (props or {}).items():
        if not isinstance(entry, dict):
            raise BoardBuilderError(f"global bible prop {slug!r} must be an object")
        worn = entry.get("attached_to")
        if worn is not None and not isinstance(worn, str):
            raise BoardBuilderError(f"global bible prop {slug!r} attached_to must be a string or null")
    return bible


def _first_sentence(text: str) -> str:
    if not text:
        return ""
    match = re.search(r"(?<=[.!?])\s+", text.strip())
    if match:
        return text[: match.start()].strip()
    return text.strip()


def _project_relative(paths: ProjectPaths, path: Path) -> str:
    try:
        return str(path.relative_to(paths.project_root))
    except ValueError:
        return str(path)


def _assert_valid_size_override(size_override: str) -> None:
    try:
        w_text, h_text = size_override.lower().split("x", 1)
        width = int(w_text)
        height = int(h_text)
    except (ValueError, AttributeError) as exc:
        raise BoardBuilderError(f"invalid size_override: {size_override!r}") from exc
    if width % 16 or height % 16:
        raise BoardBuilderError(f"invalid size_override {size_override}: dimensions must be multiples of 16")
    if width > 3840 or height > 3840:
        raise BoardBuilderError(f"invalid size_override {size_override}: dimension exceeds 3840")
    if width * height > 8_294_400:
        raise BoardBuilderError(f"invalid size_override {size_override}: pixel count exceeds gpt-image-2 limit")
    ratio = max(width / height, height / width)
    if ratio > 3:
        raise BoardBuilderError(f"invalid size_override {size_override}: ratio exceeds 3:1")


for _size, _cols, _rows in GRID_LAYOUTS.values():
    _assert_valid_size_override(_size)


def _safe_composition_out_relpath(project_root: Path, out_path: str | Path) -> str | None:
    """Return a SAFE project-relative POSIX path for the composited view, or ``None`` if
    ``out_path`` escapes ``project_root`` (an absolute path outside root, or any ``..``).

    Resolve through existing filesystem components so a symlinked parent cannot redirect
    the write outside the project tree.
    """
    path = Path(out_path)
    if ".." in path.parts:
        return None  # any '..' escape (relative or absolute) → reject
    root = project_root.resolve()
    if path.is_absolute():
        candidate = path
        try:
            path.relative_to(project_root)
        except ValueError:
            return None  # absolute, outside root → reject
    else:
        candidate = project_root / path
    resolved = candidate.resolve(strict=False)
    try:
        rel = resolved.relative_to(root)
    except ValueError:
        return None
    return rel.as_posix()


def _safe_existing_project_file(project_root: Path, artifact: str) -> Path | None:
    artifact_path = Path(artifact)
    if artifact_path.is_absolute() or ".." in artifact_path.parts:
        return None
    root = project_root.resolve()
    candidate = project_root / artifact_path
    resolved = candidate.resolve(strict=False)
    try:
        resolved.relative_to(root)
    except ValueError:
        return None
    if not resolved.is_file():
        return None
    return resolved


def render_composition_view(manifest, project: str, *, out_path: str | Path) -> str:
    """Composite a ``CompositionManifest`` into a strip/grid PNG — a pure on-demand VIEW.

    Resolves each member atom-version URN to its EXISTING artifact (Phase 0
    ``workspace.readmodel.resolve_atom_version``) and pastes the panels row-major into the
    ``GRID_LAYOUTS[slots]`` geometry. **ZERO generation, ZERO model/dispatch call** — a
    reorder of ``manifest.members`` + re-render yields a recomposited strip in the new order
    with no generation. Returns the POSIX project-relative path of the written PNG.

    This function is the layout-vocabulary validation boundary: the schema accepts any dict
    ``layout`` (avoiding a board_builder→schema cycle), so a ``slots`` value absent from
    ``GRID_LAYOUTS`` or a member count exceeding the slots is rejected HERE with
    ``BoardBuilderError``. A member whose artifact is missing also raises
    ``BoardBuilderError`` naming the URN — it NEVER generates a replacement.
    """
    # 1. Layout-vocabulary validation (the boundary the pure schema deferred to the renderer).
    # Default slots to the production board rule (4-up, or 6-up past 4) — NOT len(members),
    # which is off-grid {2,3} for a small board with the schema-default empty layout. An
    # explicit layout["slots"] overrides and is validated against GRID_LAYOUTS below.
    if "slots" in manifest.layout:
        slots = manifest.layout["slots"]
    else:
        slots = 4 if len(manifest.members) <= 4 else 6
    if slots not in GRID_LAYOUTS:
        raise BoardBuilderError(
            f"composition layout slots={slots!r} is not a GRID_LAYOUTS key "
            f"{sorted(GRID_LAYOUTS)}: cannot composite off-grid"
        )
    if len(manifest.members) > slots:
        raise BoardBuilderError(
            f"composition has {len(manifest.members)} members but layout slots={slots}: "
            "refusing an off-grid paste"
        )

    # 2. Grid geometry from the selected layout.
    size_override, cols, rows = GRID_LAYOUTS[slots]
    width_text, height_text = size_override.split("x", 1)
    width, height = int(width_text), int(height_text)
    cell_w, cell_h = width // cols, height // rows

    # 3. Resolve each member to its EXISTING artifact and paste it into its row-major cell.
    #    Lazy import: board_builder is a pipeline._lib CONSUMER of the resolver, and
    #    workspace.readmodel already imports pipeline — a module-level import here would
    #    reverse that direction and risk a cycle.
    from recoil.workspace.readmodel import resolve_atom_version

    project_root = ProjectPaths.for_project(project).project_root
    canvas = Image.new("RGB", (width, height))
    for index, urn in enumerate(manifest.members):
        artifact = resolve_atom_version(urn, project).artifact
        if not artifact:
            raise BoardBuilderError(
                f"composition member {urn!r} resolves to no artifact: "
                "refusing to generate a replacement"
            )
        panel_path = _safe_existing_project_file(project_root, artifact)
        if panel_path is None:
            raise BoardBuilderError(
                f"composition member {urn!r} artifact {artifact!r} is missing or outside "
                "the project root: "
                "refusing to generate a replacement"
            )
        panel = Image.open(panel_path).convert("RGB").resize((cell_w, cell_h))
        row, col = divmod(index, cols)
        canvas.paste(panel, (col * cell_w, row * cell_h))

    # 4. out_path safety: keep the write under project_root; reject abs-outside-root / '..'.
    rel_out = _safe_composition_out_relpath(project_root, out_path)
    if rel_out is None:
        raise BoardBuilderError(
            f"composition out_path {out_path!r} escapes project root {project_root}"
        )
    abs_out = project_root / rel_out
    abs_out.parent.mkdir(parents=True, exist_ok=True)
    canvas.save(abs_out)
    return rel_out
