"""EpisodeRunner — the unified orchestrator (CP-10).

Iterates Scenes → Beats → Takes for one project/episode. Replaces
ProductionLoop, run_episode.py, and ClientSequenceRunner.

State (Hak):
  - In-memory: live Scene tree, BudgetGuard, asyncio semaphore.
  - On disk: per-Scene JSON via pipeline.core.persistence.

Feedback (Hak):
  - ops.log.jsonl structured events.
  - Scene JSON status property (atomic-written after each Take).

Retry strategy (post-june-refactor):
  - Optional StrategyEngine injection via __init__.
  - Failed Takes classified via detect_failure_mode, strategy selected via
    StrategyEngine.select_and_apply, StrategyDiff applied to next workflow.
  - Without StrategyEngine, behavior is unchanged (blind retry).

Deletion impact (Hak):
  - No autonomous run. Workspace submit_sequence_generation breaks.
  - Manual fire requires hand-rolled Python.
"""
from __future__ import annotations

import asyncio
import copy
import dataclasses
import logging
import os
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional

from recoil.core.atomic_write import jsonl_append_locked
from recoil.core.critic import FailureMode
from recoil.core.paths import ProjectPaths
from recoil.core.ref_errors import MissingBoardRefError
from recoil.pipeline.core.dispatch_context import DispatchContext
from recoil.pipeline.core.take import Scene, Beat
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.pipeline.core.persistence import (
    SceneVersionConflictError,
    active_version,
    load_manifest,
    load_scene_active_with_version,
    save_active_scene,
    init_scenes_from_plan,
)
from recoil.pipeline._lib.recoil_bridge import load_project_config
from recoil.pipeline._lib.sublocation_registry import validate_ref_file
from recoil.pipeline.tools.find_broken_refs import check_paths

from recoil.pipeline._lib.budget_manager import BudgetGuard
from recoil.pipeline._lib.cost import cost_per_second
from recoil.pipeline._lib import ops_log
from recoil.pipeline._lib import derivation_manifest
from recoil.pipeline._lib.derivation_sha import content_sha, plan_structural_sha
from recoil.pipeline._lib.filter_safety import (
    FILTER_SAFETY_LABELS_PATH,
    build_flag_label,
)
from recoil.pipeline._lib.grouping import (
    Group,
    GroupingContext,
    GroupingIdentity,
    get_grouping,
)

from recoil.pipeline.orchestrator.production_types import RetryCostPolicy

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from recoil.core.critic import FailureMode
    from recoil.pipeline.orchestrator.strategy_registry import (
        StrategyEngine,
        StrategyDiff,
    )
    from recoil.execution.step_types import PassResult
    from recoil.pipeline.orchestrator.coverage_planner import CoveragePass

logger = logging.getLogger(__name__)


# Phase 3 scene-lock audit (live production save_scene callers, 2026-06-13):
# | caller | purpose | classification |
# | recoil/pipeline/_lib/board_builder.py:417 | persist proposed board metadata after storyboard render/gate | IN-PLACE UPDATE |
# | recoil/pipeline/_lib/board_builder.py:930 | persist selected reroll board metadata/status | IN-PLACE UPDATE |
# | recoil/pipeline/cli/generate.py:751 | persist approve/reject board decision | IN-PLACE UPDATE |
# | recoil/pipeline/cli/generate.py:785 | clear stale photoreal artifact before retrying finish | IN-PLACE UPDATE |
# | recoil/pipeline/cli/generate.py:823 | persist photoreal finish artifact after approval | IN-PLACE UPDATE |
# | recoil/pipeline/orchestrator/episode_runner.py:84 | run-state helper for takes/recovery/status saves | IN-PLACE UPDATE |
# | recoil/pipeline/orchestrator/episode_runner.py:693 | persist stale primary cleanup before reroll | IN-PLACE UPDATE |
# | recoil/pipeline/orchestrator/episode_runner.py:1845 | persist failed take state during dispatch | IN-PLACE UPDATE |
# | recoil/pipeline/orchestrator/episode_runner.py:2052 | persist take completion/strategy state during dispatch | IN-PLACE UPDATE |
# | recoil/pipeline/core/persistence.py:110,115,129 | implementation calls inside save_scene_guarded | UNRELATED/DEAD |
# | recoil/pipeline/core/persistence.py:188,204 | legacy plan init/max_takes propagation | IN-PLACE UPDATE |
# | recoil/pipeline/cli/generate.py:_run_scene_lock | manual lock/unlock scene lifecycle verb | IN-PLACE UPDATE |
# | EpisodeRunner.run_episode_batches pre-edit _save_scene_for_run(scene,path) | persist fresh scene_clusterer/grouping output to ep_{NNN}_{scene_id}.json before dispatch | RE-DERIVATION |
# The destructive re-derivation writer was the non-reroll per-batch write in
# EpisodeRunner.run_episode_batches: fresh scene_clusterer/grouping output
# persisted to ep_{NNN}_{scene_id}.json before dispatch.
# REC-231 Phase 2 made it NON-destructive (the `derive_only` branch appends an
# immutable candidate version via SceneVersionStore.write_scene_candidate), and
# Phase 7 DELETED the force/lock clobber gate (the force-overwrite CLI flag + the
# save_scene_guarded `.pre-force-*.bak` overwrite-of-locked branch). The runner no
# longer calls save_scene_guarded at all; `scene.locked` is inert UX metadata.


# REC-231 Phase 4: the module-level `_save_scene_for_run` (a raw `save_scene` to the
# flat path) is RETIRED. Every in-place take/board STATUS save now routes through
# `EpisodeRunner._persist_active_status` → `save_active_scene`, which targets the
# ACTIVE version body and enforces structure-immutability. Dry-run stays read-only.


def _scene_manifest_payload(scene: Scene) -> dict:
    if hasattr(scene, "model_dump"):
        return scene.model_dump()
    return scene.to_dict()


def _preferred_board_artifact(board: dict | None, *, project_root: Path) -> str | None:
    from recoil.pipeline._lib.board_builder import preferred_board_artifact

    return preferred_board_artifact(board, project_root=project_root)


def _board_ref_for_payload(
    board: dict | None,
    *,
    project_root: Path,
    board_gated: bool,
) -> str | None:
    artifact = _preferred_board_artifact(board, project_root=project_root)
    if artifact or board_gated or not isinstance(board, dict):
        return artifact
    raw = board.get("artifact")
    return raw if isinstance(raw, str) and raw else None


# Phase 5F (failed-but-billed accounting): patterns in receipt.error that
# indicate fal billed the request even though no usable artifact came back.
# We treat these as failed_but_billed and charge the budget tally — matches
# how content_policy_violation / partner_validation_failed actually bill.
_FAILED_BUT_BILLED_PATTERNS = (
    "content_policy_violation",
    "partner_validation_failed",
)


class PayloadBuildError(Exception):
    """Raised when _build_workflow_for_beat cannot construct the dispatch payload.

    Phase 4c: replaces the silent stub-payload pattern so build errors surface
    immediately (and in dry-run) instead of reaching the provider adapter and
    failing with a generic downstream error that masks the root cause.
    """
    def __init__(self, beat_id: str, cause: Exception):
        self.beat_id = beat_id
        self.cause = cause
        super().__init__(
            f"Payload build failed for beat={beat_id!r}: {cause}"
        )


class RerollPreflightError(Exception):
    """Structured preflight failure for the r2v_multi new-take path."""

    def __init__(self, error_code: str, message: str):
        self.error_code = error_code
        super().__init__(message)


class BoardGateError(Exception):
    """Raised when an opted-in r2v_multi beat lacks an approved storyboard."""

    def __init__(self, beat_id: str, reason: str, message: str | None = None):
        self.beat_id = beat_id
        self.reason = reason
        super().__init__(
            message
            or f"Storyboard board gate blocked beat {beat_id}: {reason}"
        )


class SceneContentDriftError(Exception):
    """REC-235 fail-closed guard: a same-topology shot.raw/script content change was
    detected on a LIVE (dispatching) run. Blocks paid dispatch of content that is not
    captured as a scene version (Writer 1 ignores raw, so it would dispatch unversioned).
    The graceful stage-a-candidate UX is deferred to REC-235; here we fail closed."""

    def __init__(self, beat_id: str, batch_id: str, message: str | None = None):
        self.beat_id = beat_id
        self.batch_id = batch_id
        super().__init__(
            message
            or (
                f"scene content for beat {beat_id} (batch {batch_id}) changed "
                "(shot.raw/script) — run `rederive --conform` before generating"
            )
        )


_BOARD_GATE_ENV_VAR = "RECOIL_BOARD_GATE"
_BOARD_GATE_CONFIG_KEY = "board_gate_episodes"


def _episode_int(episode: int | str) -> int:
    token = str(episode)
    if token.startswith("ep_"):
        token = token.removeprefix("ep_")
    return int(token)


def board_gate_enabled(project: str, episode: int) -> bool:
    """Return whether the per-episode storyboard approval gate is active."""
    override = os.environ.get(_BOARD_GATE_ENV_VAR)
    if override == "1":
        return True
    if override == "0":
        return False
    cfg = load_project_config(project) or {}
    return episode in (cfg.get(_BOARD_GATE_CONFIG_KEY) or [])


def _beat_render_attempt_take_count(beat: Beat) -> int:
    """Count a beat's takes that represent an actual dispatch/render attempt.

    Excludes build-failure STUB takes (the except-PayloadBuildError branch at
    _dispatch_one_beat ~1780 appends a status="failed" take carrying
    take_metadata["build_error"] WITHOUT dispatching anything, only so the beat
    advances its take count). Counting those would mark a beat that rendered
    nothing as rendered (B2 over-broad, beat level).
    """
    return sum(
        1 for t in beat.takes if "build_error" not in (t.take_metadata or {})
    )


def _beat_dailies_shot_ids(beat: Beat) -> list[str]:
    """Canonical shot ids for one beat, matching the dailies comprehension
    (batch_summary.shot_ids, falling back to beat_id). After Phase 2 i2v beats
    carry batch_summary, so this yields canonical shot ids for r2v + i2v."""
    return list(
        beat.beat_metadata.get("batch_summary", {}).get(
            "shot_ids", [beat.beat_id]
        )
    )


def _sanitized_candidate_scene(scene: Scene) -> Scene:
    return Scene(
        scene_id=scene.scene_id,
        beats=[
            Beat(
                beat_id=beat.beat_id,
                beat_metadata=copy.deepcopy(beat.beat_metadata),
                created_at=beat.created_at,
                max_takes=beat.max_takes,
            )
            for beat in scene.beats
        ],
        scene_metadata=copy.deepcopy(scene.scene_metadata),
        created_at=scene.created_at,
    )


def _dispatchable_beats_for_scene(
    scene: Scene,
    *,
    force_new_take: bool,
    reroll_beat_id: str | None,
) -> list[Beat]:
    return [
        beat for beat in scene.beats
        if (
            (
                force_new_take
                and reroll_beat_id is not None
                and beat.beat_id == reroll_beat_id
            )
            or
            (
                (
                    not force_new_take
                    or beat.beat_id == reroll_beat_id
                )
                and not beat.approved
                and not beat.is_exhausted
                and beat.primary_take is None
            )
        )
    ]


def _board_gate_reason(beat: Beat) -> str:
    if beat.board is None:
        return "no_board"
    status = beat.board.get("status")
    if status == "proposed":
        return "board_proposed"
    if status == "rejected":
        return "board_rejected"
    return "no_board"


def _preflight_scene_version_freshness(
    project: str, episode: int, beats: list[Beat]
) -> None:
    """Block dispatch when an active scene version is ``not_derived`` (REC-231 Phase 5).

    A candidate is born ``not_derived``; conforming to it does NOT clear that (conform
    never touches ``downstream``), so a just-conformed-to candidate would otherwise
    dispatch against absent/mis-attributed boards. This consults the per-batch manifest:
    if the ACTIVE version's ``downstream`` is ``"not_derived"``, dispatch is BLOCKED
    until a re-board calls ``SceneVersionStore.mark_derived``.

    Scope — boardable beats ONLY: the block is BOARD-centric (a conformed candidate may
    carry stale/absent boards), and the entire board machinery — including ``mark_derived``,
    the SOLE clearer of ``not_derived`` — is ``r2v_multi``-scoped (``board_builder.
    _single_r2v_multi_beat`` raises on anything else). A non-boardable ``video_i2v``
    (below-threshold, per-shot) scene has NO board to re-derive, so ``not_derived`` is inert
    for it AND it has no clearer; blocking it would leave a conformed ``video_i2v`` candidate
    PERMANENTLY undispatchable. So non-``r2v_multi`` beats are skipped here — mirroring the
    board-APPROVAL gate's own ``r2v_multi`` filter in ``_preflight_board_gate``.

    Beat→batch resolution: each boardable beat's batch id is ``beat.beat_metadata["scene_id"]``
    (== the manifest ``batch_id``). The beats are grouped by that id; each distinct batch
    is checked once. **Fail-loud:** an r2v_multi beat missing ``beat_metadata["scene_id"]``
    raises (a silent skip would let an unmapped boardable beat bypass the block). A flat
    Case-A batch (``load_manifest`` → ``None``) has no ``downstream`` — no block, as today.
    """
    by_batch: dict[str, Beat] = {}
    for beat in beats:
        if (beat.beat_metadata or {}).get("modality") != "r2v_multi":
            continue  # non-boardable (e.g. video_i2v): no board to re-derive, no clearer
        batch_id = (beat.beat_metadata or {}).get("scene_id")
        if not batch_id:
            raise ValueError(
                f"_preflight_board_gate: beat {beat.beat_id!r} is missing "
                "beat_metadata['scene_id'] — cannot resolve its batch for the "
                "scene-version freshness check (fail-loud: a silent skip would let an "
                "unmapped beat bypass the not_derived block)"
            )
        by_batch.setdefault(batch_id, beat)
    for batch_id, beat in by_batch.items():
        manifest = load_manifest(project, episode, batch_id)
        if manifest is None:
            continue  # flat Case-A batch: no downstream state, no block (unchanged)
        av = active_version(manifest)
        entry = next(
            (v for v in manifest.get("versions", []) if v.get("version") == av), None
        )
        if entry is not None and entry.get("downstream") == "not_derived":
            raise BoardGateError(
                beat.beat_id,
                "active_version_not_derived",
                f"Storyboard board gate blocked beat {beat.beat_id}: active scene "
                f"version v{av} of batch {batch_id} is not_derived (a conformed-to "
                "candidate that has not been re-boarded) — re-board before dispatch",
            )


def _preflight_board_gate(
    *,
    project: str,
    episode: int | str,
    beats: list[Beat],
) -> None:
    episode_num = _episode_int(episode)
    # REC-231 Phase 5: scene-version freshness is UNCONDITIONAL — it runs BEFORE the
    # board_gate_enabled early-return, so a project/env that disables the board-APPROVAL
    # gate still cannot dispatch an active un-derived (never re-boarded) candidate.
    _preflight_scene_version_freshness(project, episode_num, beats)
    if not board_gate_enabled(project, episode_num):
        return
    from recoil.pipeline._lib.board_builder import resolve_board_for_spend

    project_root = ProjectPaths.for_project(project).project_root
    for beat in beats:
        if beat.beat_metadata.get("modality") != "r2v_multi":
            continue
        approved, board = resolve_board_for_spend(project, episode_num, beat)
        if not approved:
            grouping = (beat.beat_metadata or {}).get("grouping") or {}
            if not isinstance(grouping, dict):
                grouping = {}
            derivable = bool(
                grouping.get("shotset_hash")
                or grouping.get("shot_ids")
            )
            reason = "board_ssot_not_approved" if derivable else _board_gate_reason(beat)
            raise BoardGateError(beat.beat_id, reason)
        if board is not None:
            beat.board = board
        artifact = _preferred_board_artifact(beat.board, project_root=project_root)
        if not artifact:
            raise BoardGateError(beat.beat_id, "no_board")
        artifact_path = Path(artifact).expanduser()
        if not artifact_path.is_absolute():
            artifact_path = project_root / artifact_path
        artifact_error = validate_ref_file(artifact_path)
        if artifact_error:
            raise BoardGateError(
                beat.beat_id,
                "no_board",
                f"Storyboard board gate blocked beat {beat.beat_id}: "
                f"approved board artifact is unusable: {artifact_error}",
            )
        # Freshness is resolved above by resolve_board_for_spend; the remaining
        # checks here are artifact availability and reference validity.


class RefIntegrityError(Exception):
    """REC-236 fail-closed gate: a resolved ref this dispatch consumes is broken
    (missing / not a regular file / too-small-or-no-image-magic, incl. an
    un-materialized git-lfs pointer). Raised in the EpisodeRunner pre-submit
    preflight — AFTER ref resolution, BEFORE the paid provider submit — so no
    money is spent rendering against a stub/text-file ref (the REC-125 class).

    The message lists each broken ref + its LFS-aware remediation. Scope is the
    refs THIS beat actually resolved (start_frame / reference_images / board ref),
    not a whole-tree scan: an unrelated broken ref elsewhere does not block.
    """

    def __init__(self, beat_id: str, broken: list):
        self.beat_id = beat_id
        self.broken = broken
        lines = "\n".join(
            f"  - {b.path} ({b.size} bytes): {b.reason}\n    → {b.remediation}"
            for b in broken
        )
        super().__init__(
            f"Ref-integrity preflight blocked beat={beat_id!r} before paid submit "
            f"— {len(broken)} broken resolved ref(s):\n{lines}"
        )


# REC-236: payload keys that carry a CONCRETE resolved ref FILE PATH this
# dispatch consumes. start_frame (i2v) is a single path str; reference_images
# (r2v) is a list[str] of resolved ref paths — an approved storyboard board ref
# is already APPENDED into reference_images by build_dispatch_payload, so it is
# covered here too. These are the resolved-refs scope (SYNTHESIS decision 2) —
# NOT a whole-tree scan.
def _resolved_ref_paths_from_workflow(workflow) -> list[Path]:
    """Collect the concrete resolved ref FILE PATHS a built workflow will submit.

    Reads the keys build_dispatch_payload populates with resolved on-disk ref
    paths: ``start_frame`` (str) and ``reference_images`` (list[str], which
    includes the appended board ref). Non-string / URL-ish values are skipped
    (only file-backed refs are checkable). Deduped, order-preserving.
    """
    seen: set[str] = set()
    out: list[Path] = []
    for step in workflow.steps:
        payload = step.payload if isinstance(step.payload, dict) else {}
        candidates: list = []
        sf = payload.get("start_frame")
        if isinstance(sf, str):
            candidates.append(sf)
        for ref in payload.get("reference_images") or []:
            if isinstance(ref, str):
                candidates.append(ref)
        for c in candidates:
            # Skip URL-ish refs (http(s)/data); only file-backed refs are checkable.
            if c.startswith(("http://", "https://", "data:")):
                continue
            if c in seen:
                continue
            seen.add(c)
            out.append(Path(c).expanduser())
    return out


class BudgetExhaustedError(Exception):
    """Raised when BudgetGuard signals over-cap. Caller saves + halts.

    Phase 5F: extended to carry pre-flight context (beat_id, estimated,
    spent, budget) so the halt log shows WHY we stopped, not just how
    much was left. Back-compat: legacy positional `remaining` still
    supported — keeps the older test (test_budget_exhaustion_halts_scene)
    green.
    """
    def __init__(
        self,
        remaining: float | None = None,
        *,
        beat_id: str | None = None,
        estimated: float | None = None,
        spent: float | None = None,
        budget: float | None = None,
        reserved: float | None = None,
        rendered_shot_ids: list[str] | None = None,
    ):
        self.beat_id = beat_id
        self.estimated = estimated
        self.spent = spent
        self.budget = budget
        self.reserved = reserved
        self.rendered_shot_ids: list[str] = list(rendered_shot_ids or [])
        if remaining is None and budget is not None and spent is not None:
            remaining = budget - spent
        self.remaining = remaining if remaining is not None else 0.0
        if beat_id is not None:
            msg = (
                f"Budget pre-flight blocked beat={beat_id!r}: "
                f"est=${(estimated or 0.0):.2f}, "
                f"spent=${(spent or 0.0):.2f}, "
                f"reserved=${(reserved or 0.0):.2f}, "
                f"budget=${(budget or 0.0):.2f}"
            )
        else:
            msg = f"Budget exhausted; remaining: ${self.remaining:.2f}"
        super().__init__(msg)


class EpisodeRunner:
    """Top-level orchestrator. One instance per overnight invocation."""

    def __init__(
        self,
        *,
        project: str,
        plan: dict,
        casting: dict | None = None,
        max_takes: int = 3,
        budget_usd: float = 50.0,
        concurrency: int = 2,
        episode: str | None = None,
        model_override: str | None = None,
        tier_override: str | None = None,
        generate_audio: bool | None = None,
        step_runner=None,
        strategy_engine: StrategyEngine | None = None,
        retry_cost_policy: RetryCostPolicy | None = None,
    ) -> None:
        self.project = project
        self.episode = episode or "ep_001"
        self.plan = plan
        self.casting = casting or {}          # {slug: element_type}
        self.max_takes = max_takes
        self.budget_usd = budget_usd
        self.concurrency = concurrency
        self.model_override = model_override
        self.tier_override = tier_override
        self.generate_audio = generate_audio
        # BudgetGuard ctor uses limit_usd, not cap_usd.
        self.budget_guard = BudgetGuard(limit_usd=budget_usd, label="episode")
        self._semaphore = asyncio.Semaphore(concurrency)
        self._step_runner = step_runner      # injected; lazy-built if None
        self._strategy_engine = strategy_engine
        self._retry_cost_policy = retry_cost_policy or RetryCostPolicy()

    # ── Scene lifecycle ───────────────────────────────────────────

    def init_scene(self, seq_id: str) -> Scene:
        """Construct an empty Scene from plan. Pure — no I/O."""
        seq_spec = (self.plan.get("sequences") or {}).get(seq_id)
        if seq_spec is None:
            raise KeyError(f"Sequence {seq_id!r} not in plan")
        beats: list[Beat] = []
        for shot in seq_spec.get("shots", []):
            shot_id = shot.get("shot_id") or shot.get("id") or f"BEAT_{len(beats):02d}"
            beats.append(Beat(
                beat_id=str(shot_id),
                max_takes=self.max_takes,
                beat_metadata={"scene_id": seq_id, "plan_shot": shot},
            ))
        return Scene(
            scene_id=seq_id, beats=beats,
            scene_metadata={"episode": self.episode, "project": self.project},
        )

    def _persist_active_status(
        self, scene: Scene, *, expected_version: int, dry_run: bool = False
    ) -> None:
        """Persist this scene's STATUS to the ACTIVE version body (REC-231 Phase 4).

        The single status-save path for the runner: dry-run stays read-only (REC-100,
        no write), and a live save routes through the structure-guarded
        ``save_active_scene`` so it targets the version body the run loaded (never the
        raw flat path when versioned) and a structural delta on a VERSIONED batch is
        rejected (``SceneStructureImmutableError``). The runner owns the in-memory scene
        (with its in-flight takes), so ``save_active_scene`` writes it directly without a
        re-load — see its docstring.
        """
        if dry_run:
            return
        save_active_scene(
            self.project, self.episode, scene.scene_id, scene,
            expected_version=expected_version,
        )

    def _active_version_for_scene(self, scene: Scene) -> int:
        """Resolve the active version for ``scene`` (1 when no body/manifest exists).

        Used where the loaded version was not threaded from the caller (the version
        the run is operating on); ``save_active_scene`` re-checks it under the lock so a
        conform mid-run still raises a conflict rather than a stale write.
        """
        try:
            _, version = load_scene_active_with_version(
                self.project, self.episode, scene.scene_id
            )
            return version
        except FileNotFoundError:
            return 1

    def recover_stale_takes(self, scene: Scene, staleness_minutes: int = 5) -> int:
        """Mark `running` Takes older than staleness_minutes as `failed`.

        Avoids API idempotency complexity — fresh Takes are dispatched.
        Returns count of Takes recovered.
        """
        cutoff = datetime.now(timezone.utc) - timedelta(minutes=staleness_minutes)
        n = 0
        for beat in scene.beats:
            for take in beat.takes:
                if take.status != "running":
                    continue
                try:
                    created = datetime.fromisoformat(take.created_at.replace("Z", "+00:00"))
                except (ValueError, AttributeError):
                    continue
                if created < cutoff:
                    take.status = "failed"
                    n += 1
        return n

    def _take_video_artifact(self, take) -> "str | None":
        """Absolute path of a take's video output if its receipt recorded one.

        Resolves project-relative receipt paths against the project root.
        Returns None when no video step has an output_path.
        """
        from pathlib import Path
        from recoil.core.paths import projects_root

        for step in take.workflow.steps:
            if step.modality not in ("video_i2v", "r2v_multi"):
                continue
            rr = getattr(step.receipt, "run_result", None) if step.receipt else None
            op = getattr(rr, "output_path", None) if rr else None
            if not op:
                continue
            p = Path(op)
            if not p.is_absolute():
                p = projects_root() / self.project / op
            return str(p)
        return None

    def _primary_take_grouping_for_reroll(self, take) -> dict:
        from pathlib import Path
        from recoil.core.naming import parse_filename

        video = self._take_video_artifact(take)
        parsed = parse_filename(Path(video).name) if video else None
        if not parsed:
            raise RerollPreflightError(
                "reroll_segment_drift",
                "--new-take could not derive grouping identity from the primary "
                "take artifact filename.",
            )

        strategy = parsed.get("strategy")
        ordinal = parsed.get("ordinal")
        if strategy != "coverage":
            if parsed["strategy"] in ("continuity", "oner"):
                return {
                    "strategy": parsed["strategy"],
                    "ordinal": int(parsed["ordinal"]),
                    "shot_ids": parsed.get("shot_ids") or [],
                    "source_pass_id": None,
                }
            if strategy == "solo" and len(parsed.get("shot_ids") or []) > 1:
                raise RerollPreflightError(
                    "reroll_segment_drift",
                    "--new-take could not derive PASS identity from the "
                    "primary take artifact filename.",
                )
            raise RerollPreflightError(
                "reroll_non_coverage_deferred",
                "--new-take currently supports coverage-pass rerolls only; "
                "general non-coverage reroll is deferred to REC-111.",
            )
        if ordinal is None:
            raise RerollPreflightError(
                "reroll_segment_drift",
                "--new-take could not derive coverage ordinal from the primary "
                "take artifact filename.",
            )
        return {
            "strategy": "coverage",
            "ordinal": int(ordinal),
            "shot_ids": list(parsed.get("shot_ids") or []),
            "source_pass_id": None,
        }

    def _primary_take_pass_counter_for_reroll(self, take) -> int:
        return int(self._primary_take_grouping_for_reroll(take)["ordinal"])

    def invalidate_phantom_succeeded_takes(self, scene: Scene, *, mutate: bool = True) -> int:
        """Demote `succeeded` Takes whose video artifact is missing on disk.

        Guards the phantom-success class: a persisted scene can carry
        status=succeeded while the output file was never written, was evicted,
        or belongs to stale state restored from an older tree. Trusting that
        status makes the runner skip dispatch and report a render that does not
        exist (the `feedback-harness-self-report-not-truth` failure). We fail
        loud and demote to `failed` so the beat re-dispatches a fresh take.
        Returns count demoted.
        """
        from pathlib import Path

        n = 0
        for beat in scene.beats:
            for take in beat.takes:
                if take.status != "succeeded":
                    continue
                video = self._take_video_artifact(take)
                ok = bool(video) and Path(video).is_file() and Path(video).stat().st_size > 0
                if ok:
                    continue
                if not mutate:
                    raise RerollPreflightError(
                        "reroll_requires_succeeded_primary",
                        "--new-take requires every existing succeeded primary take to have an artifact.",
                    )
                logger.warning(
                    "invalidate_phantom_succeeded_takes: take %s (beat %s) is "
                    "status=succeeded but artifact is missing/empty (%r) — "
                    "demoting to failed so it re-dispatches",
                    take.take_id, beat.beat_id, video,
                )
                self._demote_take(beat, take, phantom=True)
                n += 1
        return n

    def _demote_take(self, beat, take, *, phantom: bool = False) -> None:
        """Demote a take to ``failed`` and release it as primary if selected.

        Centralizes the demotion contract for every resume-time revalidation
        path (phantom-artifact here; fingerprint drift in REC-12). A take left
        as ``beat.primary_take_id`` is still returned by ``Beat.primary_take``
        regardless of status, and ``run_scene`` only dispatches beats where
        ``primary_take is None`` — so a demotion that fails to clear the primary
        selection silently prevents re-dispatch (REC-14).

        Pass ``phantom=True`` when the demotion is for a missing artifact so
        the beat is granted a bounded recovery slot (REC-19). Fingerprint-drift
        demotions (REC-12) leave the count unchanged — those are legitimate
        re-dispatches driven by ref changes, not phantom artifacts.
        """
        take.status = "failed"
        for step in take.workflow.steps:
            if step.modality in ("video_i2v", "r2v_multi"):
                step.status = "failed"
        if beat.primary_take_id == take.take_id:
            beat.primary_take_id = None
        if phantom:
            beat.grant_phantom_recovery()

    def revalidate_succeeded_fingerprints(self, scene: Scene, *, mutate: bool = True) -> int:
        """Re-run the Phase-8 ref-drift check on every SUCCEEDED beat at resume.

        REC-12: the dispatch-time drift check in ``_build_workflow_for_beat`` is
        dead on resume — ``run_scene`` only dispatches beats where
        ``primary_take is None``, so a beat that already succeeded is never
        rebuilt and a stale take is silently reused even after its resolved refs
        change (e.g. hero → composite-sheet promotion: same shot_id, different
        ref path).  We close that gap by recomputing the fingerprint from the
        CURRENT resolved refs for each succeeded beat and demoting (via
        ``_demote_take``) any whose stored fingerprint no longer matches, so
        ``run_scene`` re-dispatches them with the new refs.  The dispatch-time
        check stays as a backstop.

        Refs are resolved via ``build_dispatch_payload(dry_run=True)`` — the
        audit path resolves ``reference_images`` through the SAME
        ``_collect_reference_images`` call as the live payload, so the
        fingerprint is byte-identical, but skips the start_frame existence check
        and end_frame base64 encode.  This keeps the accepted cost of Option A
        (re-resolve on every resume) to ref-collection disk reads only — no
        billed dispatch, no base64.  A beat we cannot resolve (no shot metadata,
        or a build error) is left untouched so a transient resolve failure never
        silently discards a good take.  Returns count demoted.
        """
        from recoil.pipeline._lib.dispatch_payload import build_dispatch_payload
        from recoil.pipeline._lib.plan_loader import CanonicalShot

        n = 0
        for beat in scene.beats:
            # Only succeeded beats are at risk — run_scene already re-dispatches
            # the rest, where the dispatch-time backstop fires.
            primary = beat.primary_take
            if primary is None or primary.status != "succeeded":
                continue

            modality = beat.beat_metadata.get("modality", "video_i2v")
            shot_data = beat.beat_metadata.get("shot")
            if shot_data is None:
                if not mutate:
                    raise RerollPreflightError(
                        "reroll_refs_drifted",
                        "Could not verify non-target succeeded beat integrity "
                        "during reroll preflight.",
                    )
                continue  # legacy beat with no shot metadata — cannot resolve refs
            batch_shots_data = (
                beat.beat_metadata.get("batch_shots")
                if modality == "r2v_multi" else None
            )
            shot = CanonicalShot(**shot_data) if isinstance(shot_data, dict) else shot_data
            batch_shots = (
                [CanonicalShot(**s) if isinstance(s, dict) else s for s in batch_shots_data]
                if batch_shots_data else None
            )
            board_gated = (
                beat.board_is_approved()
                and board_gate_enabled(self.project, _episode_int(self.episode))
            )

            try:
                payload = build_dispatch_payload(
                    shot=shot,
                    project=self.project,
                    modality=modality,
                    model_override=self.model_override,
                    tier_override=self.tier_override,
                    generate_audio=self.generate_audio,
                    episode=self.episode,
                    batch_shots=batch_shots,
                    dry_run=True,
                    skip_author=not mutate,
                    grouping=beat.beat_metadata.get("grouping"),
                    # Approved board rides into the drift fingerprint so that
                    # approve -> ref change -> demote -> re-dispatch fires for
                    # already-succeeded beats (the V-D re-render trigger).
                    board_ref_path=(
                        _board_ref_for_payload(
                            beat.board,
                            project_root=ProjectPaths.for_project(self.project).project_root,
                            board_gated=board_gated,
                        )
                        if beat.board_is_approved() else None
                    ),
                    board_gated=board_gated,
                )
            except Exception as exc:  # noqa: BLE001
                if mutate and board_gated and isinstance(exc, MissingBoardRefError):
                    logger.warning(
                        "revalidate_succeeded_fingerprints: beat %s is board-gated "
                        "but has no finished board ref; demoting succeeded primary "
                        "so board preflight blocks before dispatch",
                        beat.beat_id,
                    )
                    self._demote_take(beat, primary)
                    n += 1
                    continue
                # Resolve failure → leave the take as-is; the dispatch-time
                # backstop will surface the real error if/when it re-dispatches.
                logger.warning(
                    "revalidate_succeeded_fingerprints: could not resolve refs "
                    "for beat %s (%s) — skipping drift check", beat.beat_id, exc,
                )
                if not mutate:
                    raise RerollPreflightError(
                        "reroll_refs_drifted",
                        "Could not verify non-target succeeded beat integrity "
                        "during reroll preflight.",
                    ) from exc
                continue

            fresh_refs = payload.get("reference_images") or [] if isinstance(payload, dict) else []
            fresh_fp = self._compute_inputs_fingerprint(fresh_refs)
            stored_fp = beat.beat_metadata.get("inputs_fingerprint", "")
            # Demote on ANY change from a non-empty stored fingerprint —
            # including drift to zero refs (fresh_fp == ""), which is still real
            # drift (a ref was removed). Requiring fresh_fp truthy here would
            # silently accept the stale take when refs disappear (Codex, REC-12).
            if not mutate and stored_fp != fresh_fp:
                raise RerollPreflightError(
                    "reroll_refs_drifted",
                    "Resolved references drifted since a succeeded take completed.",
                )
            if stored_fp and stored_fp != fresh_fp:
                if not mutate:
                    raise RerollPreflightError(
                        "reroll_refs_drifted",
                        "Resolved references drifted since a succeeded take completed.",
                    )
                logger.warning(
                    "revalidate_succeeded_fingerprints: inputs_fingerprint drift "
                    "on succeeded beat %s (stored=%s… fresh=%s…) — demoting "
                    "primary take so it rebuilds with updated refs",
                    beat.beat_id, stored_fp[:12], fresh_fp[:12],
                )
                self._demote_take(beat, primary)
                n += 1
            # Refresh the stored fingerprint to the current refs so a second
            # resume is a no-op (mirrors the dispatch-time check's final write).
            if mutate:
                beat.beat_metadata["inputs_fingerprint"] = fresh_fp
        return n

    def _build_reroll_preflight_payload(self, beat: Beat) -> dict:
        from recoil.pipeline._lib.dispatch_payload import build_dispatch_payload
        from recoil.pipeline._lib.plan_loader import CanonicalShot

        modality = beat.beat_metadata.get("modality", "video_i2v")
        shot_data = beat.beat_metadata.get("shot")
        batch_shots_data = (
            beat.beat_metadata.get("batch_shots")
            if modality == "r2v_multi" else None
        )
        if shot_data is None:
            raise RerollPreflightError(
                "new_take_requires_single_r2v_multi_beat",
                f"--new-take target {beat.beat_id} has no shot metadata.",
            )
        shot = CanonicalShot(**shot_data) if isinstance(shot_data, dict) else shot_data
        batch_shots = (
            [CanonicalShot(**s) if isinstance(s, dict) else s for s in batch_shots_data]
            if batch_shots_data else None
        )
        board_gated = (
            beat.board_is_approved()
            and board_gate_enabled(self.project, _episode_int(self.episode))
        )
        return build_dispatch_payload(
            shot=shot,
            project=self.project,
            modality=modality,
            model_override=self.model_override,
            tier_override=self.tier_override,
            generate_audio=self.generate_audio,
            episode=self.episode,
            batch_shots=batch_shots,
            dry_run=True,
            skip_author=True,
            grouping=beat.beat_metadata.get("grouping"),
            board_ref_path=(
                _board_ref_for_payload(
                    beat.board,
                    project_root=ProjectPaths.for_project(self.project).project_root,
                    board_gated=board_gated,
                )
                if beat.board_is_approved() else None
            ),
            board_gated=board_gated,
        )

    def _video_dir_for_reroll(self):
        from pathlib import Path

        direct = getattr(self._step_runner, "video_dir", None)
        if direct is not None:
            return Path(direct)
        paths = getattr(self._step_runner, "_paths", None)
        video_dir = getattr(paths, "video_dir", None)
        if video_dir is None:
            raise RerollPreflightError(
                "reroll_collision",
                "Could not resolve video directory for reroll collision preflight.",
            )
        return Path(video_dir)

    def prepare_beat_for_reroll(
        self, scene: Scene, beat: Beat, *, expected_version: int | None = None
    ) -> dict:
        """Clear a stale primary (REC-120) before a fresh reroll take.

        A primary is stale when it exists but its status is not ``succeeded`` OR
        its video artifact is missing on disk (the phantom-success class — same
        artifact check as ``invalidate_phantom_succeeded_takes``). When stale, the
        primary selection is released and ``beat_metadata["reroll_cleared_stale"]``
        is stamped with the cleared take_id — the explicit marker distinguishing a
        deliberately-cleared primary from one that never existed (consumed by
        ``_preflight_reroll``). The scene is persisted via the existing scene save
        path only when a primary was cleared, so the stamp persists with it.

        Called ONLY by the new reroll entry points (``--batch`` CLI /
        ``POST /reroll``); every other reroll path keeps today's behavior.
        """
        from pathlib import Path

        cleared_stale_primary: str | None = None
        primary = beat.primary_take
        if primary is not None:
            video = self._take_video_artifact(primary)
            artifact_ok = (
                bool(video)
                and Path(video).is_file()
                and Path(video).stat().st_size > 0
            )
            if primary.status != "succeeded" or not artifact_ok:
                cleared_stale_primary = primary.take_id
                beat.primary_take_id = None
                beat.beat_metadata["reroll_cleared_stale"] = primary.take_id

        if cleared_stale_primary is not None:
            # REC-231 Phase 4: persist the cleared-primary STATUS to the version the
            # reroll caller LOADED this scene from (threaded `expected_version`), NOT the
            # pointer re-read at save time. A conform/revert landing between the reroll
            # load and this clear must raise SceneVersionConflictError under the lock —
            # never silently write the stale loaded scene into a newly-active version.
            # Falls back to the active pointer only for non-versioned callers that thread
            # nothing (mirrors run_scene's expected_version=None handling).
            if expected_version is None:
                expected_version = self._active_version_for_scene(scene)
            self._persist_active_status(scene, expected_version=expected_version)

        return {
            "beat_id": beat.beat_id,
            "cleared_stale_primary": cleared_stale_primary,
            "next_take_index": len(beat.takes),
        }

    def _preflight_reroll(
        self,
        scene: Scene,
        target_beat_id: str | None = None,
        allow_cleared_stale: bool = False,
    ) -> Beat:
        from recoil.core.naming import build_filename

        targets = [
            beat for beat in scene.beats
            if beat.beat_metadata.get("modality") == "r2v_multi"
        ]
        if len(targets) != 1:
            raise RerollPreflightError(
                "new_take_requires_single_r2v_multi_beat",
                "--new-take requires exactly one r2v_multi target beat.",
            )
        beat = targets[0]
        if target_beat_id is not None and beat.beat_id != target_beat_id:
            raise RerollPreflightError(
                "new_take_requires_single_r2v_multi_beat",
                "--new-take requires exactly one r2v_multi target beat.",
            )
        primary = beat.primary_take
        # REC-120 cleared-stale branch: permitted ONLY when prepare_beat_for_reroll
        # stamped the explicit marker AND the primary is genuinely absent/unsucceeded.
        # A healthy succeeded primary always takes the standard path below.
        if allow_cleared_stale and beat.beat_metadata.get("reroll_cleared_stale") and (
            primary is None or primary.status != "succeeded"
        ):
            return self._preflight_reroll_cleared_stale(beat)
        if primary is None or primary.status != "succeeded":
            raise RerollPreflightError(
                "reroll_requires_succeeded_primary",
                "--new-take requires an existing succeeded primary take.",
            )
        primary_video = self._take_video_artifact(primary)
        if not primary_video:
            raise RerollPreflightError(
                "reroll_requires_succeeded_primary",
                "--new-take requires an existing succeeded primary take.",
            )
        from pathlib import Path

        primary_path = Path(primary_video)
        if not primary_path.is_file() or primary_path.stat().st_size <= 0:
            raise RerollPreflightError(
                "reroll_requires_succeeded_primary",
                "--new-take requires an existing succeeded primary take.",
            )

        primary_segment_shot_ids = self._primary_take_segment_shot_ids_for_reroll(beat)
        current_segment_shot_ids = self._segment_shot_ids_for_beat(beat)
        if primary_segment_shot_ids != current_segment_shot_ids:
            raise RerollPreflightError(
                "reroll_segment_drift",
                "--new-take target segment shots changed since the succeeded take "
                f"(persisted={primary_segment_shot_ids}, fresh={current_segment_shot_ids}).",
            )

        payload = self._build_reroll_preflight_payload(beat)
        fresh_refs = payload.get("reference_images") or []
        fresh_fp = self._compute_inputs_fingerprint(fresh_refs)
        stored_fp = beat.beat_metadata.get("inputs_fingerprint", "")
        if stored_fp != fresh_fp:
            raise RerollPreflightError(
                "reroll_refs_drifted",
                "Resolved references drifted since the primary take succeeded.",
            )

        if not primary_segment_shot_ids:
            raise RerollPreflightError(
                "new_take_requires_single_r2v_multi_beat",
                "--new-take target has no r2v_multi segment shot IDs.",
            )
        ep_match = re.search(r"EP(\d+)", str(primary_segment_shot_ids[0]))
        if not ep_match:
            raise RerollPreflightError(
                "reroll_collision",
                "Could not derive episode token for reroll filename preflight.",
            )
        take_number = len(beat.takes) + 1
        grouping = self._primary_take_grouping_for_reroll(primary)
        video_dir = self._video_dir_for_reroll()
        expected = video_dir / build_filename(
            episode=int(ep_match.group(1)),
            strategy=grouping["strategy"],
            ordinal=grouping["ordinal"],
            shot_ids=list(primary_segment_shot_ids),
            take=take_number,
        )
        legacy_expected = video_dir / build_filename(
            episode=int(ep_match.group(1)),
            pass_counter=grouping["ordinal"],
            shot_ids=list(primary_segment_shot_ids),
            take=take_number,
        )
        if expected.exists() or legacy_expected.exists():
            raise RerollPreflightError(
                "reroll_collision",
                f"Reroll target filename already exists: "
                f"{expected if expected.exists() else legacy_expected}",
            )
        return beat

    def _cleared_stale_grouping_from_metadata(self, beat: Beat) -> dict:
        """Resolve + validate the REC-102 grouping stamp for a cleared-stale reroll.

        Required shape: ``strategy`` in {coverage, continuity, oner}, an
        integer-coercible ``ordinal``, a ``shot_ids`` list (empty allowed only when
        already empty in metadata), and ``source_pass_id`` present or defaulted to
        None. Anything else is unresolvable.
        """
        grouping = beat.beat_metadata.get("grouping")
        if isinstance(grouping, dict):
            strategy = grouping.get("strategy")
            ordinal = grouping.get("ordinal")
            shot_ids = grouping.get("shot_ids")
            if (
                strategy in ("coverage", "continuity", "oner")
                and ordinal is not None
                and isinstance(shot_ids, list)
            ):
                try:
                    ordinal_int = int(ordinal)
                except (TypeError, ValueError):
                    ordinal_int = None
                if ordinal_int is not None:
                    return {
                        "strategy": str(strategy),
                        "ordinal": ordinal_int,
                        "shot_ids": list(shot_ids or []),
                        "source_pass_id": grouping.get("source_pass_id"),
                    }
        raise RerollPreflightError(
            "reroll_identity_unresolvable",
            "--new-take could not resolve grouping identity from a healthy primary "
            "artifact or persisted beat metadata.",
        )

    def _preflight_reroll_cleared_stale(self, beat: Beat) -> Beat:
        """Preflight a reroll whose stale primary was cleared (REC-120).

        No succeeded primary artifact exists, so grouping identity is derived from
        the persisted ``beat_metadata["grouping"]`` (REC-102 stamp) instead of the
        primary artifact filename. The prior-artifact comparison checks (segment
        drift vs primary, ref-fingerprint drift vs primary) are skipped, but the
        filename-collision preflight still runs against the metadata-derived
        grouping. Single-r2v-multi-target, no-extra-dispatchable-beats, and the
        budget gate are enforced by the caller / dispatch loop, unchanged.
        """
        from recoil.core.naming import build_filename

        grouping = self._cleared_stale_grouping_from_metadata(beat)
        segment_shot_ids = self._segment_shot_ids_for_beat(beat) or list(
            grouping["shot_ids"] or []
        )
        # Spec: an EMPTY shot_ids list is valid when the metadata genuinely
        # persisted it empty (REC-102 can stamp []). Episode derivation then
        # falls back to the beat metadata shot id; raise only when NO episode
        # source exists at all.
        if not segment_shot_ids and "shot_ids" not in grouping:
            raise RerollPreflightError(
                "reroll_identity_unresolvable",
                "--new-take could not resolve grouping identity from a healthy "
                "primary artifact or persisted beat metadata.",
            )
        ep_source = (
            str(segment_shot_ids[0])
            if segment_shot_ids
            else str(((beat.beat_metadata.get("shot") or {}).get("shot_id")) or "")
        )
        if not segment_shot_ids:
            # build_filename rejects empty shot_ids (ValueError). Spec allows
            # metadata-empty lists, so derive the filename list from the beat
            # shot id; if even that is absent, fail with the structured error
            # rather than crashing downstream.
            if not ep_source:
                raise RerollPreflightError(
                    "reroll_identity_unresolvable",
                    "--new-take could not resolve grouping identity from a "
                    "healthy primary artifact or persisted beat metadata.",
                )
            segment_shot_ids = [ep_source]
        ep_match = re.search(r"EP(\d+)", ep_source)
        if not ep_match:
            raise RerollPreflightError(
                "reroll_collision",
                "Could not derive episode token for reroll filename preflight.",
            )
        take_number = len(beat.takes) + 1
        video_dir = self._video_dir_for_reroll()
        expected = video_dir / build_filename(
            episode=int(ep_match.group(1)),
            strategy=grouping["strategy"],
            ordinal=grouping["ordinal"],
            shot_ids=list(segment_shot_ids),
            take=take_number,
        )
        legacy_expected = video_dir / build_filename(
            episode=int(ep_match.group(1)),
            pass_counter=grouping["ordinal"],
            shot_ids=list(segment_shot_ids),
            take=take_number,
        )
        if expected.exists() or legacy_expected.exists():
            raise RerollPreflightError(
                "reroll_collision",
                f"Reroll target filename already exists: "
                f"{expected if expected.exists() else legacy_expected}",
            )
        return beat

    @staticmethod
    def _beat_would_dispatch_first_take(beat: Beat) -> bool:
        return (
            not beat.approved
            and not beat.is_exhausted
            and beat.primary_take is None
        )

    def _preflight_reroll_no_extra_dispatchable_beats(
        self,
        scene: Scene,
        target_beat_id: str,
    ) -> None:
        extras = [
            beat.beat_id for beat in scene.beats
            if beat.beat_id != target_beat_id
            and self._beat_would_dispatch_first_take(beat)
        ]
        if extras:
            raise RerollPreflightError(
                "new_take_requires_single_r2v_multi_beat",
                "--new-take target scene has non-target dispatchable beats.",
            )

    # ── Take dispatch (per Beat) ──────────────────────────────────

    @staticmethod
    def _derive_tag_for_beat(
        modality: str,
        shot,
        batch_shots,
    ) -> str:
        """Canonical semantic tag for a beat.

        Grammar token must satisfy [A-Z][A-Z0-9]*(_[A-Z][A-Z0-9]*)*.

        Rules (Phase 3 naming-reset):
          - r2v_multi coverage pass, shared character anchor → "A_<CHAR>"
          - r2v_multi coverage pass, env-only               → "COV_<LOC>"
                                                             or "COV_ENV"
          - video_i2v single shot, env-only                  → "SOLO_ENV"
          - video_i2v single shot, one character             → "SOLO_<CHAR>"
          - fallback (no signal)                             → "SOLO"
        """
        def _sanitize(token: str | None, default: str) -> str:
            if not token:
                return default
            cleaned = re.sub(r"[^A-Z0-9_]", "", str(token).upper().replace("-", "_"))
            cleaned = re.sub(r"_+", "_", cleaned).strip("_")
            if not cleaned or not re.match(r"^[A-Z][A-Z0-9]*(?:_[A-Z][A-Z0-9]*)*$", cleaned):
                return default
            return cleaned

        # Coverage / batched paths.
        if modality == "r2v_multi" and batch_shots:
            # Anchor character if all batch shots share one.
            char_ids: set[str] = set()
            loc_ids: set[str] = set()
            any_chars = False
            for s in batch_shots:
                chars = getattr(s, "characters", None) or []
                if chars:
                    any_chars = True
                    for c in chars:
                        cid = getattr(c, "char_id", None) or (
                            c.get("char_id") if isinstance(c, dict) else None
                        )
                        if cid:
                            char_ids.add(str(cid))
                loc = getattr(s, "location_id", None) or (
                    s.get("location_id") if isinstance(s, dict) else None
                )
                if loc:
                    loc_ids.add(str(loc))
            if len(char_ids) == 1:
                return f"A_{_sanitize(next(iter(char_ids)), 'CHAR')}"
            if not any_chars:
                if len(loc_ids) == 1:
                    return f"COV_{_sanitize(next(iter(loc_ids)), 'LOC')}"
                return "COV_ENV"
            # Mixed character batch → fall through to SOLO sentinel.
        # Single-shot / ad-hoc fallback.
        chars = getattr(shot, "characters", None) or []
        if not chars:
            return "SOLO_ENV"
        first = chars[0]
        cid = getattr(first, "char_id", None) or (
            first.get("char_id") if isinstance(first, dict) else None
        )
        if cid:
            return f"SOLO_{_sanitize(str(cid), 'CHAR')}"
        return "SOLO"

    @staticmethod
    def _segment_shot_ids_for_beat(beat) -> list[str]:
        metadata = beat.beat_metadata or {}
        segment_shot_ids = metadata.get("segment_shot_ids")
        if segment_shot_ids:
            return [str(sid) for sid in segment_shot_ids]

        batch_shots = metadata.get("batch_shots") or []
        shot_ids: list[str] = []
        for shot in batch_shots:
            if isinstance(shot, dict):
                source_ids = shot.get("source_shot_ids")
                if source_ids:
                    shot_ids.extend(str(sid) for sid in source_ids)
                    continue
                shot_id = shot.get("source_shot_id") or shot.get("shot_id")
            else:
                source_ids = getattr(shot, "source_shot_ids", None)
                if source_ids:
                    shot_ids.extend(str(sid) for sid in source_ids)
                    continue
                shot_id = getattr(shot, "source_shot_id", None) or getattr(
                    shot, "shot_id", None
                )
            if shot_id:
                shot_ids.append(str(shot_id))
        if shot_ids:
            return shot_ids

        primary = getattr(beat, "primary_take", None)
        workflow = getattr(primary, "workflow", None)
        for step in getattr(workflow, "steps", []) or []:
            payload = getattr(step, "payload", {}) or {}
            segment_ids = (
                payload.get("segment_shot_ids")
                if isinstance(payload, dict) else None
            )
            if segment_ids:
                return [str(sid) for sid in segment_ids]
        return shot_ids

    def _primary_take_segment_shot_ids_for_reroll(self, beat) -> list[str]:
        from pathlib import Path
        from recoil.core.naming import parse_filename

        primary = getattr(beat, "primary_take", None)
        if primary is None or primary.status != "succeeded":
            raise RerollPreflightError(
                "reroll_requires_succeeded_primary",
                "--new-take requires an existing succeeded primary take.",
            )
        primary_video = self._take_video_artifact(primary) if primary else None
        if not primary_video:
            raise RerollPreflightError(
                "reroll_requires_succeeded_primary",
                "--new-take requires an existing succeeded primary take.",
            )
        parsed = parse_filename(Path(primary_video).name) if primary_video else None
        shot_ids = parsed.get("shot_ids") if parsed else None
        if not shot_ids:
            raise RerollPreflightError(
                "reroll_segment_drift",
                "--new-take could not derive target segment shots from the "
                "primary take artifact filename.",
            )
        return [str(sid) for sid in shot_ids]

    def _estimate_take_cost(self, beat: Beat) -> float:
        """Pre-flight cost estimate for one Take attempt on `beat`.

        Returns cost_per_second(model) * total_duration_s across the beat's
        shots. Returns 0.0 if we can't pin down a per-second rate or
        duration — callers treat 0.0 as "no per-second gate possible" and
        fall through to the legacy post-hoc check.
        """
        from recoil.pipeline._lib.plan_loader import CanonicalShot

        modality = beat.beat_metadata.get("modality", "video_i2v")
        shot_data = beat.beat_metadata.get("shot")
        if shot_data is None:
            return 0.0
        shot = (
            CanonicalShot(**shot_data) if isinstance(shot_data, dict)
            else shot_data
        )
        model_id = self._resolve_model_id_for_beat(
            modality, shot, self.model_override,
        )
        rate = cost_per_second(model_id)
        if rate <= 0.0:
            return 0.0

        # Aggregate duration: r2v_multi covers the whole batch; single-shot
        # modalities use just the head shot.
        total_duration_s = 0.0
        batch_shots_data = (
            beat.beat_metadata.get("batch_shots")
            if modality == "r2v_multi" else None
        )
        if batch_shots_data:
            for s in batch_shots_data:
                s_obj = (
                    CanonicalShot(**s) if isinstance(s, dict) else s
                )
                d = getattr(s_obj, "duration_s", None)
                if d is not None:
                    total_duration_s += float(d)
        else:
            d = getattr(shot, "duration_s", None)
            if d is not None:
                total_duration_s = float(d)

        if total_duration_s <= 0.0:
            return 0.0
        return float(rate) * total_duration_s

    @staticmethod
    def _compute_inputs_fingerprint(
        reference_images: list,
        ref_manifest: dict | list | None = None,
    ) -> str:
        """Stable sha256 over RESOLVED ref paths — never shot IDs.

        Phase 8 (S2): detects when a beat's RESOLVED reference inputs have
        drifted (e.g. hero ref promoted to composite sheet — same shot_id,
        different ref path) so stale succeeded takes can be invalidated and
        rebuilt instead of reused silently.

        Hash is over the SORTED resolved ``reference_images`` paths (and
        sorted ``ref_manifest`` items if provided) — sorting ensures
        determinism regardless of order.  Returns "" for empty/no-ref beats
        so no false drift is triggered on beats that carry zero refs.

        CORRECTNESS INVARIANT: the hash is over the actual file paths
        (``payload["reference_images"]``), NOT over shot IDs.  A shot-ID-only
        hash would miss ref promotion (hero→sheet) and re-introduce the exact
        bug this phase prevents.
        """
        import hashlib

        paths: list[str] = sorted(str(p) for p in (reference_images or []))
        manifest_items: list[str] = sorted(str(m) for m in (ref_manifest or []))

        if not paths and not manifest_items:
            return ""

        h = hashlib.sha256()
        for p in paths:
            h.update(p.encode())
            h.update(b"\x00")
        for m in manifest_items:
            h.update(m.encode())
            h.update(b"\x01")
        return h.hexdigest()

    @staticmethod
    def _resolve_model_id_for_beat(
        modality: str,
        shot,
        model_override: str | None,
    ) -> str:
        """Resolve the canonical model_id used for filename derivation.

        Mirrors dispatch_payload.build_dispatch_payload's resolution order:
        explicit override → shot.video_model → DEFAULT_VIDEO_MODEL. Falls
        back to a literal string only as last resort — must be non-empty
        (build_filename's hard constraint).
        """
        if model_override:
            return str(model_override)
        candidate = getattr(shot, "video_model", None)
        if candidate:
            return str(candidate)
        try:
            from recoil.pipeline._lib.dispatch_payload import DEFAULT_VIDEO_MODEL
            return str(DEFAULT_VIDEO_MODEL)
        except Exception:  # noqa: BLE001
            return "seeddance-2.0"

    def _grouping_for_beat_metadata(
        self,
        beat: Beat,
        *,
        modality: str,
        batch_shots,
        grouping_override: dict | None = None,
        fallback_beat_index: int | None = None,
    ) -> dict:
        grouping = grouping_override or beat.beat_metadata.get("grouping")
        if isinstance(grouping, dict):
            strategy = grouping.get("strategy")
            ordinal = grouping.get("ordinal")
            if strategy and ordinal is not None:
                return GroupingIdentity(
                    strategy=str(strategy),
                    ordinal=int(ordinal),
                    shot_ids=list(grouping.get("shot_ids") or []),
                    source_pass_id=grouping.get("source_pass_id"),
                ).to_dict()

        shot_ids = [getattr(shot, "shot_id", str(shot)) for shot in (batch_shots or [])]
        if not shot_ids:
            shot_data = beat.beat_metadata.get("shot")
            if isinstance(shot_data, dict) and shot_data.get("shot_id"):
                shot_ids = [str(shot_data["shot_id"])]
            elif getattr(shot_data, "shot_id", None):
                shot_ids = [str(shot_data.shot_id)]
        if modality == "r2v_multi":
            return GroupingIdentity(
                strategy="continuity",
                ordinal=(fallback_beat_index or 0) + 1,
                shot_ids=shot_ids,
                source_pass_id=None,
            ).to_dict()
        return GroupingIdentity(
            strategy="solo",
            ordinal=0,
            shot_ids=shot_ids,
            source_pass_id=None,
        ).to_dict()

    def _build_workflow_for_beat(
        self,
        beat: Beat,
        take_index: int,
        beat_index: int | None = None,
        grouping_override: dict | None = None,
        strategy_override: str | None = None,
        reroll_note: str | None = None,
    ) -> Workflow:
        """Build a Workflow for one Take attempt on this Beat.

        Beats produced by run_episode_batches carry:
          beat.beat_metadata["modality"]      — "video_i2v" or "r2v_multi"
          beat.beat_metadata["shot"]          — CanonicalShot (head)
          beat.beat_metadata["batch_shots"]   — list[CanonicalShot]  (r2v_multi only)
          beat.beat_metadata["scene_id"]      — Batch.batch_id

        Grouping provenance is injected here and consumed by the r2v writer.
        """
        from recoil.pipeline._lib.dispatch_payload import build_dispatch_payload

        from recoil.pipeline._lib.plan_loader import CanonicalShot

        modality = beat.beat_metadata.get("modality", "video_i2v")
        shot_data = beat.beat_metadata.get("shot")
        batch_shots_data = beat.beat_metadata.get("batch_shots") if modality == "r2v_multi" else None

        if shot_data is None:
            # Legacy init_scenes_from_plan beat (plan_shot only, no `shot`). This stub is
            # NOT a live take-emitter and so does NOT need the REC-231 batch_id/scene_version
            # provenance stamp: the only path that loads these plan_shot-only beats is
            # EpisodeRunner.run_episode, whose sole caller is a unit test
            # (test_episode_runner) — no CLI/API entrypoint reaches it. Live dispatch flows
            # through run_episode_batches, which ALWAYS supplies beat_metadata["shot"] (full
            # provenance), so a production take is never emitted from this stub.
            return Workflow(
                workflow_id=f"{beat.beat_id}_take_{take_index}",
                steps=[WorkflowStep(step_id="video", modality="video_i2v",
                                    payload={"shot_id": beat.beat_id})],
                global_provenance={"shot_id": beat.beat_id,
                                   "episode": self.episode},
            )

        # beat_metadata["shot"] is stored as a dict (dataclasses.asdict) for
        # JSON serialization compat; reconstruct CanonicalShot here.
        shot = CanonicalShot(**shot_data) if isinstance(shot_data, dict) else shot_data
        batch_shots = (
            [CanonicalShot(**s) if isinstance(s, dict) else s for s in batch_shots_data]
            if batch_shots_data else None
        )

        grouping = self._grouping_for_beat_metadata(
            beat,
            modality=modality,
            batch_shots=batch_shots,
            grouping_override=grouping_override,
            fallback_beat_index=beat_index,
        )
        tag = self._derive_tag_for_beat(modality, shot, batch_shots)
        model_id = self._resolve_model_id_for_beat(modality, shot, self.model_override)
        board_gated = (
            beat.board_is_approved()
            and board_gate_enabled(self.project, _episode_int(self.episode))
        )
        board_ref_path = (
            _board_ref_for_payload(
                beat.board,
                project_root=ProjectPaths.for_project(self.project).project_root,
                board_gated=board_gated,
            )
            if beat.board_is_approved() else None
        )

        try:
            payload = build_dispatch_payload(
                shot=shot,
                project=self.project,
                modality=modality,
                model_override=self.model_override,
                tier_override=self.tier_override,
                generate_audio=self.generate_audio,
                episode=self.episode,
                batch_shots=batch_shots,
                prompt_directive=beat.beat_metadata.get("prompt_directive"),
                grouping=grouping,
                generation_config=beat.beat_metadata.get("generation_config"),
                element_config=beat.beat_metadata.get("element_config"),
                strategy_override=strategy_override,
                board_ref_path=board_ref_path,
                board_gated=board_gated,
            )
        except BudgetExhaustedError:
            # Never wrap a budget-cap signal as a payload error — it must halt the
            # run, not be swallowed as a failed take (Debug R1 / Gemini F2,
            # defensive: not currently raised in the wrapped region).
            raise
        except Exception as exc:  # noqa: BLE001
            # Phase 4c: raise typed PayloadBuildError instead of returning a
            # stub payload that silently propagates to the provider adapter.
            # This surfaces the root cause immediately — including in dry-run —
            # and avoids wasting a billed dispatch.
            raise PayloadBuildError(beat_id=beat.beat_id, cause=exc) from exc

        if isinstance(payload, dict):
            gen_config = beat.beat_metadata.get("generation_config") or {}
            for key in (
                "tier", "seed", "resolution", "format_mode",
                "anchor_duration_s", "regen_previz_for_segment",
            ):
                if key in gen_config:
                    val = gen_config[key]
                    if val is not None:
                        payload[key] = val
                    else:
                        payload.pop(key, None)

            for _hk in ("seed", "tier"):
                if gen_config.get(_hk) is not None:
                    payload.setdefault("provider_hints", {})[_hk] = gen_config[_hk]

            element_config = beat.beat_metadata.get("element_config") or {}
            if element_config:
                ec = payload.setdefault("element_config", {})
                ec.update(element_config)

            payload["grouping"] = dict(grouping)
            payload["tag"] = tag
            payload.setdefault("model", model_id)

        # Phase 8 (S2): inputs_fingerprint — detect ref drift at dispatch-time.
        #
        # Approach (b) chosen over (a): refs are ALREADY resolved here (payload
        # was just built from them), so we hash once at dispatch instead of
        # re-resolving a second time inside the load gate.  The drift check is
        # therefore cheap and definitively correct.
        #
        # The fingerprint is computed over payload["reference_images"] — the
        # ACTUAL resolved file paths — NEVER over shot IDs.  A shot-ID hash
        # would miss hero→composite-sheet promotion (same shot_id, different
        # ref path) and silently reuse stale takes.
        #
        # If the stored fingerprint differs from the freshly-computed one AND
        # the beat already has succeeded takes → those takes are invalidated
        # (demoted to "failed") so the beat re-dispatches with the new refs.
        #
        # Storyboards intentionally ride in payload["reference_images"] after
        # approval. That changes the fingerprint and is the designed re-render
        # trigger: approve board -> attach board ref -> dispatch fresh take.
        if isinstance(payload, dict):
            fresh_refs = payload.get("reference_images") or []
            fresh_fp = self._compute_inputs_fingerprint(fresh_refs)
            stored_fp = beat.beat_metadata.get("inputs_fingerprint", "")
            if stored_fp and fresh_fp and stored_fp != fresh_fp:
                # Ref drift detected.  Invalidate any succeeded takes so they
                # rebuild with the updated refs instead of being reused.
                drifted = 0
                for t in beat.takes:
                    if t.status == "succeeded":
                        t.status = "failed"
                        drifted += 1
                if drifted:
                    logger.warning(
                        "_build_workflow_for_beat: inputs_fingerprint drift on "
                        "beat %s (stored=%s… fresh=%s…) — demoted %d succeeded "
                        "take(s) to failed so they rebuild with updated refs",
                        beat.beat_id, stored_fp[:12], fresh_fp[:12], drifted,
                    )
            beat.beat_metadata["inputs_fingerprint"] = fresh_fp

        steps = [
            WorkflowStep(
                step_id="video",
                modality=modality,
                payload=payload,
            ),
        ]
        global_provenance = {
            "shot_id": beat.beat_id,
            "scene_id": beat.beat_metadata.get("scene_id"),
            "episode": self.episode,
            "project": self.project,
            "modality": modality,
            "grouping": dict(grouping),
            "tag": tag,
            "model": model_id,
        }
        # REC-231 Phase 5: stamp {batch_id, scene_version, beat_id} so the downstream
        # Take/receipt provenance carries the active scene version it was derived
        # against. The batch id is the beat's structural scene anchor; fail loud rather
        # than stamp a None/empty batch. The active version comes from that batch's
        # manifest (1 for a flat batch with no manifest). Additive — existing keys kept.
        _batch_id = beat.beat_metadata.get("scene_id")
        if not _batch_id:
            raise ValueError(
                f"_build_workflow_for_beat: beat {beat.beat_id!r} is missing "
                "beat_metadata['scene_id'] — cannot stamp scene-version provenance "
                "(fail-loud rather than stamp a None/empty batch_id)"
            )
        _manifest = load_manifest(self.project, self.episode, _batch_id)
        global_provenance["batch_id"] = _batch_id
        global_provenance["scene_version"] = (
            active_version(_manifest) if _manifest is not None else 1
        )
        global_provenance["beat_id"] = beat.beat_id
        if reroll_note is not None:
            global_provenance["reroll_note"] = reroll_note
        return Workflow(
            workflow_id=f"{beat.beat_id}_take_{take_index}",
            steps=steps,
            global_provenance=global_provenance,
        )

    def _classify_take_failure(
        self, take, beat: Beat, beat_index: int,
    ) -> tuple[tuple[FailureMode, float], PassResult, CoveragePass]:
        """Build a synthetic PassResult + CoveragePass and classify.

        Returns ((failure_mode, confidence), pass_result, coverage_pass)
        so the caller can reuse the constructed objects without rebuilding.
        """
        from recoil.pipeline.orchestrator.strategy_registry import detect_failure_mode

        synthetic_result = self._take_to_pass_result(take, beat)
        synthetic_pass = self._beat_to_coverage_pass(beat, beat_index)
        classification = detect_failure_mode(synthetic_result, synthetic_pass)
        if classification[0] is FailureMode.CONTENT_FILTER_HARD_BLOCK:
            jsonl_append_locked(
                FILTER_SAFETY_LABELS_PATH,
                build_flag_label(
                    take,
                    synthetic_result,
                    project=self.project,
                    episode=self.episode,
                    shot_id=beat.beat_id,
                ),
            )
        return classification, synthetic_result, synthetic_pass

    def _beat_to_coverage_pass(
        self, beat: Beat, beat_index: int,
    ) -> CoveragePass:
        """Construct a CoveragePass from beat metadata for strategy engine.

        This is the inbound bridge: EpisodeRunner speaks Beat, StrategyEngine
        speaks CoveragePass. The synthetic pass has sparse fields (no
        arc_preamble, minimal element_config on first pass) — strategies
        that read absent fields return applicable=False diffs.
        """
        from recoil.pipeline.orchestrator.coverage_planner import CoveragePass, CoverageSegment

        shot_data = beat.beat_metadata.get("shot") or {}
        modality = beat.beat_metadata.get("modality", "video_i2v")
        batch_shots_data = (
            beat.beat_metadata.get("batch_shots")
            if modality == "r2v_multi"
            else None
        )

        segments: list[CoverageSegment] = []
        if batch_shots_data:
            for i, s in enumerate(batch_shots_data):
                if isinstance(s, dict):
                    segments.append(CoverageSegment(
                        segment_index=i,
                        source_shot_id=s.get("shot_id", f"seg_{i}"),
                        shot_type=s.get("shot_type", "MS"),
                        duration_s=int(s.get("duration_s", 5)),
                        prompt=s.get("description", ""),
                    ))
        elif shot_data:
            segments.append(CoverageSegment(
                segment_index=0,
                source_shot_id=shot_data.get("shot_id", beat.beat_id),
                shot_type=shot_data.get("shot_type", "MS"),
                duration_s=int(shot_data.get("duration_s", 5)),
                prompt=shot_data.get("description", ""),
            ))

        focus_char = ""
        chars = shot_data.get("characters") or []
        if chars:
            first = chars[0] if isinstance(chars[0], dict) else {}
            focus_char = first.get("char_id", "")

        first_id = segments[0].source_shot_id if segments else beat.beat_id
        last_id = segments[-1].source_shot_id if segments else beat.beat_id

        return CoveragePass(
            pass_id=f"{beat.beat_id}_synth_{beat_index}",
            episode_id=self.episode,
            shot_range=(first_id, last_id),
            camera_side="A",
            label=f"ep_runner_{beat.beat_id}",
            focus_character=focus_char,
            pass_type="character" if focus_char else "env",
            segments=segments,
            location_id=shot_data.get("location_id", ""),
        )

    @staticmethod
    def _take_to_pass_result(take, beat: Beat) -> PassResult:
        """Construct a PassResult from a failed Take for strategy engine.

        Reads step receipts to extract error messages, video path, cost,
        and model. This is the inbound bridge for the result side.
        """
        from recoil.execution.step_types import PassResult
        error_parts: list[str] = []
        video_path: str | None = None
        total_cost = 0.0
        model_id = ""

        for step in take.workflow.steps:
            r = getattr(step, "receipt", None)
            if r is None:
                continue
            rr = getattr(r, "run_result", None)
            if rr is None:
                continue
            if not rr.success and rr.error:
                error_parts.append(rr.error)
            if getattr(rr, "output_path", None) and video_path is None:
                video_path = str(rr.output_path)
            md = getattr(rr, "metadata", None) or {}
            total_cost += float(md.get("cost_usd", 0.0) or 0.0)
            if md.get("model"):
                model_id = str(md["model"])

        return PassResult(
            pass_id=beat.beat_id,
            success=False,
            video_path=video_path,
            cost_usd=total_cost,
            error=" | ".join(error_parts) if error_parts else None,
            model=model_id,
            take_index=len(beat.takes) - 1,
        )

    @staticmethod
    def _apply_strategy_diff(
        diff: StrategyDiff,
        wf: Workflow,
        original_pass: CoveragePass | None,
    ) -> None:
        """Mutate a Workflow's step payloads based on a StrategyDiff.

        Translates CoveragePass-level mutations into dispatch payload
        overlays. Called after _build_workflow_for_beat constructs the
        base workflow, before the Take executes.

        The outbound bridge: StrategyEngine speaks CoveragePass,
        EpisodeRunner speaks Workflow. This method maps between them.
        """
        from recoil.pipeline.orchestrator.strategy_registry import RetryStrategyName

        modified = diff.modified_pass
        gen_config = modified.generation_config or {}

        for step in wf.steps:
            p = step.payload
            if not isinstance(p, dict):
                continue

            # ── Generation config fields ────────────────────────
            for key in (
                "tier", "seed", "resolution", "format_mode",
                "anchor_duration_s", "regen_previz_for_segment",
            ):
                if key in gen_config:
                    val = gen_config[key]
                    if val is not None:
                        p[key] = val
                    else:
                        p.pop(key, None)

            # ── REC-38: route the WIRE-EFFECTIVE gen_config keys (seed, tier)
            # through provider_hints — the only channel a runner/adapter reads.
            # VideoRunner forwards payload["provider_hints"] → execute_video →
            # StepRunnerHints → flora params.update(hints); fal reads tier via
            # resolve_adapter. The top-level copies above die at the VideoRunner
            # key whitelist. (resolution/format_mode/anchor/regen have no reader
            # yet — net-new feature work, deferred.)
            for _hk in ("seed", "tier"):
                if gen_config.get(_hk) is not None:
                    p.setdefault("provider_hints", {})[_hk] = gen_config[_hk]

            # ── Element config (ref image settings) ─────────────
            if modified.element_config:
                ec = p.setdefault("element_config", {})
                ec.update(modified.element_config)

            # ── Arc preamble -> prompt prefix ────────────────────
            if modified.arc_preamble:
                orig_preamble = ""
                if original_pass is not None:
                    orig_preamble = original_pass.arc_preamble or ""
                new_prefix = modified.arc_preamble
                if orig_preamble and new_prefix.endswith(orig_preamble):
                    new_prefix = new_prefix[: -len(orig_preamble)]
                if new_prefix.strip():
                    existing = p.get("prompt", "")
                    if not existing.startswith(new_prefix):
                        p["prompt"] = new_prefix + existing

            # ── Blueprint / start frame removal ─────────────────
            if modified.blueprint_image_path is None and (
                diff.strategy_name == RetryStrategyName.SWITCH_I2V_TO_R2V
            ):
                for frame_key in (
                    "start_frame_url", "start_frame", "image_url",
                    "blueprint_image_path",
                ):
                    p.pop(frame_key, None)

            # ── Duration from modified segments ─────────────────
            if modified.segments:
                total_s = sum(s.duration_s for s in modified.segments)
                if total_s > 0:
                    p["duration"] = total_s

            # ── Segment-level prompt mutations for r2v_multi ────
            batch_shots = p.get("batch_shots")
            if batch_shots and modified.segments:
                for i, seg in enumerate(modified.segments):
                    if i < len(batch_shots) and isinstance(
                        batch_shots[i], dict
                    ):
                        batch_shots[i]["description"] = seg.prompt
                        batch_shots[i]["duration_s"] = seg.duration_s

            # ── Provenance stamp ────────────────────────────────
            p["_retry_strategy"] = diff.strategy_name.value
            p["_retry_cost_tier"] = diff.cost_tier

        # Workflow-level provenance
        wf.global_provenance["retry_strategy"] = diff.strategy_name.value
        wf.global_provenance["retry_cost_tier"] = diff.cost_tier
        wf.global_provenance["retry_changes"] = diff.changes

    async def _dispatch_one_beat(
        self,
        beat: Beat,
        scene: Scene,
        expected_version: int,
        dry_run: bool,
        force_new_take: bool = False,
        seed: int | None = None,
        make_primary: bool = False,
        strategy_override: str | None = None,
        reroll_note: str | None = None,
    ) -> None:
        """Run Takes sequentially within a Beat. Up to max_takes attempts.

        When a StrategyEngine is provided, failed Takes trigger failure
        classification -> strategy selection -> workflow mutation. Without
        a StrategyEngine, retries are blind (same workflow, existing behavior).
        """
        async with self._semaphore:
            try:
                beat_index = scene.beats.index(beat)
            except ValueError:
                beat_index = 0
            reroll_grouping = (
                self._primary_take_grouping_for_reroll(beat.primary_take)
                if force_new_take and beat.primary_take is not None
                else None
            )

            # ── Per-beat retry state (NEW) ──────────────────────────
            _strategies_tried: list[str] = []
            _cumulative_retry_cost: float = 0.0
            _original_pass: Optional[CoveragePass] = None
            _current_pass: Optional[CoveragePass] = None
            _active_diff: Optional[StrategyDiff] = None
            _strategy_exhausted = False
            _reroll_attempted = False

            while (
                (force_new_take or not beat.is_exhausted)
                and (force_new_take or not beat.approved)
                and not _strategy_exhausted
                and (
                    beat.primary_take is None
                    or (force_new_take and not _reroll_attempted)
                )
            ):
                # ── Budget gate (UNCHANGED) ─────────────────────────
                est = 0.0
                if not dry_run:
                    est = self._estimate_take_cost(beat)
                    if est > 0.0 and self.budget_guard.would_exceed(est):
                        raise BudgetExhaustedError(
                            beat_id=beat.beat_id,
                            estimated=est,
                            spent=self.budget_guard.spent,
                            budget=self.budget_usd,
                            reserved=self.budget_guard.reserved,
                        )
                    elif est <= 0.0 and self.budget_guard.spent >= self.budget_usd:
                        raise BudgetExhaustedError(
                            self.budget_usd - self.budget_guard.spent
                        )

                # ── Build workflow (MODIFIED) ───────────────────────
                # Phase 4c: PayloadBuildError is caught HERE (before the
                # dry-run fast path) so build failures surface in dry-run too,
                # not just in live dispatch. This is the point of the fix.
                take_index = len(beat.takes)
                if force_new_take:
                    _reroll_attempted = True
                prior_inputs_fingerprint = beat.beat_metadata.get("inputs_fingerprint")
                try:
                    wf = self._build_workflow_for_beat(
                        beat,
                        take_index,
                        beat_index=beat_index,
                        grouping_override=reroll_grouping,
                        strategy_override=strategy_override,
                        reroll_note=reroll_note,
                    )
                except PayloadBuildError as pbe:
                    logger.error(
                        "_dispatch_one_beat: payload build error for beat=%s "
                        "(take_index=%d): %s",
                        beat.beat_id, take_index, pbe,
                    )
                    ops_log.write({
                        "event": "take_payload_build_error",
                        "scene_id": scene.scene_id,
                        "beat_id": beat.beat_id,
                        "take_index": take_index,
                        "error": str(pbe),
                    })
                    # Create a minimal take so the beat advances its take count
                    # and is not stuck in an infinite retry loop.
                    _stub_wf = Workflow(
                        workflow_id=f"{beat.beat_id}_take_{take_index}_buildfail",
                        steps=[WorkflowStep(
                            step_id="video",
                            modality="video_i2v",
                            payload={"shot_id": beat.beat_id},
                        )],
                        global_provenance={"shot_id": beat.beat_id,
                                           "episode": self.episode},
                    )
                    _fail_take = beat.new_take(workflow=_stub_wf)
                    _fail_take.status = "failed"
                    _fail_take.take_metadata["build_error"] = str(pbe)
                    self._persist_active_status(
                        scene, expected_version=expected_version, dry_run=dry_run
                    )
                    # Release the would_exceed reservation — nothing was
                    # dispatched. Leaving it reserved leaks budget on every
                    # build failure and eventually false-fires
                    # BudgetExhaustedError (REC-122).
                    if est > 0.0:
                        self.budget_guard.release(est)
                    continue
                if force_new_take and prior_inputs_fingerprint is not None:
                    beat.beat_metadata["inputs_fingerprint"] = prior_inputs_fingerprint

                # ── REC-236 ref-integrity preflight (fail CLOSED) ───
                # Runs AFTER ref resolution (the wf payload carries this beat's
                # resolved start_frame / reference_images) but BEFORE the take is
                # created/persisted and BEFORE the paid submit — so a broken ref
                # releases the budget reservation and fails closed WITHOUT leaving
                # a phantom running take or leaked budget. Scope = THIS beat's
                # resolved refs only; an unrelated broken ref elsewhere does not
                # block. A broken ref (missing / not a file / too-small-or-no-magic,
                # incl. an un-materialized git-lfs pointer) means no money is spent
                # rendering against a stub (the REC-125 class). dry-run is never
                # gated (it spends nothing and the estimate must still compute).
                if not dry_run:
                    _resolved_refs = _resolved_ref_paths_from_workflow(wf)
                    if _resolved_refs:
                        _broken = check_paths(_resolved_refs)
                        if _broken:
                            if est > 0.0:
                                self.budget_guard.release(est)
                            ops_log.write({
                                "event": "ref_integrity_blocked",
                                "scene_id": scene.scene_id,
                                "beat_id": beat.beat_id,
                                "take_index": take_index,
                            })
                            raise RefIntegrityError(beat.beat_id, _broken)

                for step in wf.steps:
                    payload = step.payload if isinstance(step.payload, dict) else None
                    grouping_payload = (
                        payload.get("grouping") if payload is not None else None
                    )
                    if (
                        isinstance(grouping_payload, dict)
                        and grouping_payload.get("strategy") == "coverage"
                        and grouping_payload.get("ordinal") is not None
                    ):
                        payload.setdefault(
                            "pass_counter",
                            int(grouping_payload["ordinal"]),
                        )

                # Apply strategy mutations to workflow payload (NEW)
                if _active_diff is not None:
                    self._apply_strategy_diff(_active_diff, wf, _original_pass)

                if force_new_take:
                    for step in wf.steps:
                        if not isinstance(step.payload, dict):
                            continue
                        step.payload["forced_take_number"] = take_index + 1
                        if seed is not None:
                            step.payload.setdefault("provider_hints", {})["seed"] = seed

                take = beat.new_take(workflow=wf)
                if force_new_take:
                    take.take_metadata["force_new_take"] = True

                # ── Dispatch log + pre-save (UNCHANGED) ─────────────
                ops_log.write({
                    "event": "take_dispatched",
                    "scene_id": scene.scene_id,
                    "beat_id": beat.beat_id,
                    "take_index": take_index,
                })
                if not dry_run:
                    take.status = "running"
                    self._persist_active_status(
                        scene, expected_version=expected_version, dry_run=dry_run
                    )

                # ── Dry-run fast path (UNCHANGED) ───────────────────
                if dry_run:
                    take.status = "succeeded"
                    for step in take.workflow.steps:
                        step.status = "succeeded"
                    take.take_metadata["dry_run"] = True
                    if not force_new_take or make_primary:
                        beat.primary_take_id = take.take_id
                    # Dry-run is read-only w.r.t. scene state — do NOT persist the
                    # simulated take (REC-100); the estimate is computed in-memory.
                    ops_log.write({
                        "event": "take_completed_dry",
                        "scene_id": scene.scene_id,
                        "beat_id": beat.beat_id,
                        "take_index": take_index,
                    })
                    continue

                # ── Execute (UNCHANGED) ─────────────────────────────
                ctx = DispatchContext(
                    caller_id="episode_runner",
                    step_runner=self._step_runner,
                    project=self.project, episode=int(self.episode.split("_")[-1])
                        if self.episode.startswith("ep_") else 1,
                )
                # Recheck immediately before paid dispatch. A microscopic window remains
                # after this read; closing it requires a cross-dispatch lease (out of scope).
                try:
                    current_manifest = load_manifest(
                        self.project, self.episode, scene.scene_id
                    )
                except FileNotFoundError:
                    current_manifest = None
                current_version = (
                    active_version(current_manifest)
                    if current_manifest is not None else 1
                )
                if current_version != expected_version:
                    raise SceneVersionConflictError(
                        scene.scene_id, expected_version, current_version
                    )
                try:
                    if force_new_take:
                        from recoil.pipeline.core.dispatch import register_default_runners

                        register_default_runners(self._step_runner, force=True)
                    await asyncio.to_thread(take.execute, context=ctx)
                except Exception as exc:           # noqa: BLE001
                    if force_new_take:
                        take.status = "failed"
                        for step in take.workflow.steps:
                            step.status = "failed"
                    if est > 0.0:
                        self.budget_guard.release(est)
                    _active_diff = None
                    ops_log.write({
                        "event": "take_completed", "status": "failed",
                        "scene_id": scene.scene_id, "beat_id": beat.beat_id,
                        "take_index": take_index, "error": str(exc),
                    })
                    self._persist_active_status(
                        scene, expected_version=expected_version, dry_run=dry_run
                    )
                    if force_new_take:
                        break
                    continue

                # ── Cost accounting (UNCHANGED) ─────────────────────
                step_cost = 0.0
                failed_but_billed_cost = 0.0
                for step in take.workflow.steps:
                    r = getattr(step, "receipt", None)
                    if r is None:
                        continue
                    rr = r.run_result if r.run_result else None
                    md = (rr.metadata or {}) if rr else {}
                    observed_cost = float(md.get("cost_usd", 0.0) or 0.0)
                    if rr is not None and rr.success:
                        step_cost += observed_cost
                    elif rr is not None:
                        # Failed steps NEVER feed step_cost (charging a failure
                        # as kind="succeeded" was the phantom-spend bug,
                        # REC-122). They contribute only verified billed spend:
                        # the provider's billed hint, the provider's own
                        # reported cost, or the pattern-matched estimate.
                        err_str = (rr.error or "").lower()
                        billed_hint = float(
                            md.get("inference_billed_usd", 0.0) or 0.0
                        )
                        if billed_hint > 0.0:
                            failed_but_billed_cost += billed_hint
                        elif observed_cost > 0.0:
                            failed_but_billed_cost += observed_cost
                        elif any(
                            pat in err_str
                            for pat in _FAILED_BUT_BILLED_PATTERNS
                        ):
                            failed_but_billed_cost += est if est > 0.0 else 0.0
                if step_cost > 0.0:
                    self.budget_guard.charge(
                        step_cost,
                        reserved_amount=est if est > 0.0 else None,
                        kind="succeeded",
                    )
                    if failed_but_billed_cost > 0.0:
                        # Mixed take: a successful step plus a failed-but-
                        # billed step. The reservation was already debited
                        # by the succeeded charge — debit 0 here so the
                        # billed failure still lands in spent (gate finding
                        # on REC-122: the old elif silently dropped it).
                        self.budget_guard.charge(
                            failed_but_billed_cost,
                            reserved_amount=0.0,
                            kind="failed_but_billed",
                        )
                elif failed_but_billed_cost > 0.0:
                    self.budget_guard.charge(
                        failed_but_billed_cost,
                        reserved_amount=est if est > 0.0 else None,
                        kind="failed_but_billed",
                    )
                else:
                    if est > 0.0:
                        self.budget_guard.release(est)

                ops_log.write({
                    "event": "take_completed", "status": take.status,
                    "scene_id": scene.scene_id, "beat_id": beat.beat_id,
                    "take_index": take_index,
                })

                # ── Phase 4b: no-reason-failure invariant (backstop) ────
                # A take must never settle into "not succeeded" with every
                # step receipt carrying an empty/None error.  If that
                # invariant is violated a synthetic error is injected so
                # downstream classification and logging have something to
                # work with instead of a silent blank failure.
                if take.status != "succeeded":
                    _all_empty_errors = all(
                        not (
                            getattr(getattr(step, "receipt", None), "run_result", None)
                            and getattr(
                                getattr(step.receipt, "run_result", None), "error", None
                            )
                        )
                        for step in take.workflow.steps
                    )
                    if _all_empty_errors:
                        _synthetic_error = (
                            f"Take {take.take_id} settled into status="
                            f"{take.status!r} with no error on any step receipt "
                            f"(Phase 4b invariant triggered)"
                        )
                        logger.warning(
                            "_dispatch_one_beat: %s (beat=%s)",
                            _synthetic_error, beat.beat_id,
                        )
                        take.take_metadata.setdefault(
                            "synthetic_error", _synthetic_error
                        )
                        ops_log.write({
                            "event": "take_no_reason_failure",
                            "scene_id": scene.scene_id,
                            "beat_id": beat.beat_id,
                            "take_index": take_index,
                            "synthetic_error": _synthetic_error,
                        })

                # ── Success (UNCHANGED) ─────────────────────────────
                if take.status == "succeeded":
                    if not force_new_take or make_primary:
                        beat.primary_take_id = take.take_id
                        ops_log.write({
                            "event": "beat_primary_selected",
                            "scene_id": scene.scene_id, "beat_id": beat.beat_id,
                            "take_id": take.take_id,
                        })

                # ── Failure -> strategy selection (NEW) ──────────────
                elif force_new_take:
                    _strategy_exhausted = True
                elif self._strategy_engine is not None:
                    try:
                        from recoil.pipeline.orchestrator.strategy_registry import RetryStrategyName
                        (failure_mode, confidence), synthetic_result, synthetic_pass = (
                            self._classify_take_failure(take, beat, beat_index)
                        )
                        if _original_pass is None:
                            _original_pass = synthetic_pass
                        _current_pass = (
                            _active_diff.modified_pass
                            if _active_diff is not None
                            else synthetic_pass
                        )

                        diff = self._strategy_engine.select_and_apply(
                            failure_mode=failure_mode,
                            coverage_pass=_current_pass,
                            pass_result=synthetic_result,
                            already_tried=_strategies_tried,
                            original_pass=_original_pass,
                            cumulative_retry_cost=_cumulative_retry_cost,
                            cost_policy=self._retry_cost_policy,
                        )

                        if diff is None or (
                            diff.strategy_name
                            == RetryStrategyName.ESCALATE_TO_HUMAN
                        ):
                            _strategy_exhausted = True
                            ops_log.write({
                                "event": "beat_strategy_exhausted",
                                "scene_id": scene.scene_id,
                                "beat_id": beat.beat_id,
                                "failure_mode": failure_mode.value,
                                "strategies_tried": _strategies_tried,
                                "cumulative_retry_cost": round(
                                    _cumulative_retry_cost, 4
                                ),
                            })
                        else:
                            _active_diff = diff
                            _strategies_tried.append(
                                diff.strategy_name.value
                            )
                            _cumulative_retry_cost += diff.estimated_cost_usd
                            ops_log.write({
                                "event": "retry_strategy_selected",
                                "scene_id": scene.scene_id,
                                "beat_id": beat.beat_id,
                                "take_index": take_index,
                                "failure_mode": failure_mode.value,
                                "failure_confidence": round(confidence, 2),
                                "strategy": diff.strategy_name.value,
                                "cost_tier": diff.cost_tier,
                                "estimated_cost_usd": round(
                                    diff.estimated_cost_usd, 4
                                ),
                                "changes": diff.changes,
                                "cumulative_retry_cost": round(
                                    _cumulative_retry_cost, 4
                                ),
                            })
                            # Cost warning when approaching cap
                            if _cumulative_retry_cost >= self._retry_cost_policy.warn_threshold_usd:
                                ops_log.write({
                                    "event": "retry_cost_warning",
                                    "beat_id": beat.beat_id,
                                    "cumulative_retry_cost": round(
                                        _cumulative_retry_cost, 4
                                    ),
                                    "warn_threshold": self._retry_cost_policy.warn_threshold_usd,
                                    "max_allowed": self._retry_cost_policy.max_retry_spend_usd,
                                })
                    except Exception as exc:               # noqa: BLE001
                        _active_diff = None
                        logger.warning(
                            "Strategy engine error for beat %s; "
                            "falling back to blind retry: %s",
                            beat.beat_id, exc,
                        )
                        ops_log.write({
                            "event": "strategy_engine_error",
                            "scene_id": scene.scene_id,
                            "beat_id": beat.beat_id,
                            "error": str(exc),
                        })
                # else: no strategy engine -> blind retry (existing behavior)

                self._persist_active_status(
                    scene, expected_version=expected_version, dry_run=dry_run
                )
                if force_new_take:
                    break

            # ── Post-loop exhaustion log (MODIFIED) ─────────────────
            if (
                (beat.is_exhausted or _strategy_exhausted)
                and beat.primary_take is None
            ):
                ops_log.write({
                    "event": "beat_exhausted",
                    "scene_id": scene.scene_id,
                    "beat_id": beat.beat_id,
                    "exhaustion_reason": (
                        "strategy_exhausted"
                        if _strategy_exhausted
                        else "max_takes_reached"
                    ),
                    "strategies_tried": _strategies_tried or None,
                })

    # ── Public async entry points ────────────────────────────────

    async def run_scene(
        self,
        scene: Scene,
        dry_run: bool = False,
        force_new_take: bool = False,
        seed: int | None = None,
        make_primary: bool = False,
        reroll_beat_id: str | None = None,
        strategy_override: str | None = None,
        reroll_note: str | None = None,
        allow_cleared_stale: bool = False,
        expected_version: int | None = None,
    ) -> Scene:
        """Run all Beats in a Scene with concurrency=N. Mutates in place.

        REC-231 Phase 4: ``expected_version`` is the active version the caller loaded
        this scene from; in-place take/board STATUS saves target THAT version body via
        the structure-guarded writer. When the caller did not thread it (it is None),
        it is resolved here against the active manifest (and re-checked under the lock
        at save time, so a conform mid-run raises a conflict rather than a stale write).
        """
        if expected_version is None:
            expected_version = self._active_version_for_scene(scene)
        if force_new_take:
            if reroll_beat_id is None:
                raise RerollPreflightError(
                    "reroll_preflight_required",
                    "force_new_take requires a reroll target beat.",
                )
            reroll_target = self._preflight_reroll(
                scene,
                target_beat_id=reroll_beat_id,
                allow_cleared_stale=allow_cleared_stale,
            )
            self._preflight_reroll_no_extra_dispatchable_beats(
                scene,
                reroll_target.beat_id,
            )
            original_beats = scene.beats
            scene.beats = [
                beat for beat in original_beats
                if beat.beat_id != reroll_target.beat_id
            ]
            try:
                self.invalidate_phantom_succeeded_takes(scene, mutate=False)
                self.revalidate_succeeded_fingerprints(scene, mutate=False)
            finally:
                scene.beats = original_beats
            if dry_run:
                return scene

        eligible_beats = _dispatchable_beats_for_scene(
            scene,
            force_new_take=force_new_take,
            reroll_beat_id=reroll_beat_id,
        )
        _preflight_board_gate(
            project=self.project,
            episode=self.episode,
            beats=eligible_beats,
        )
        ops_log.write({"event": "sequence_started", "scene_id": scene.scene_id})

        async def _wrap(beat):
            await self._dispatch_one_beat(
                beat,
                scene,
                expected_version,
                dry_run,
                force_new_take=force_new_take and beat.beat_id == reroll_beat_id,
                seed=seed,
                make_primary=make_primary,
                strategy_override=strategy_override,
                reroll_note=reroll_note,
            )

        tasks = [asyncio.create_task(_wrap(b)) for b in eligible_beats]
        try:
            await asyncio.gather(*tasks)
        except BaseException:
            # Cancel siblings so a budget exhaust (or any failure) halts the
            # scene cleanly instead of letting parallel beats keep charging.
            for t in tasks:
                if not t.done():
                    t.cancel()
            await asyncio.gather(*tasks, return_exceptions=True)
            raise
        finally:
            self._persist_active_status(
                scene, expected_version=expected_version, dry_run=dry_run
            )

        ops_log.write({
            "event": "sequence_complete",
            "scene_id": scene.scene_id, "status": scene.status,
        })
        return scene

    async def run_episode_batches(
        self,
        canonical_plan,
        dry_run: bool = False,
        force_single_batch: bool = False,
        pass_id: str | None = None,
        grouping: str = "continuity",
        selected_coverage_passes: list[CoveragePass] | None = None,
        tier_map: dict[str, int] | None = None,
        wildcard_override: bool | None = None,
        force_new_take: bool = False,
        seed: int | None = None,
        make_primary: bool = False,
        strategy_override: str | None = None,
        derive_only: bool = False,
        dedup_candidate: bool = False,
        only_scene_ids: set[str] | None = None,
    ) -> list | dict:
        """Walk CanonicalPlan.shots → batches → Beats.

        For each Batch:
          - If batch.below_threshold: emit one Beat per shot (modality
            video_i2v).
          - Else: emit ONE Beat with modality r2v_multi and the full
            batch attached in beat_metadata.
        All Beats for a single Batch share a synthetic Scene; the
        Scene.scene_id is the Batch.batch_id.
        """
        from recoil.pipeline.core.scene_version_store import SceneVersionStore

        selected_coverage_passes = list(selected_coverage_passes or [])

        if not force_new_take:
            # Idempotently initialize Scene JSONs from plan; pass max_takes so
            # a CLI override propagates to existing scenes on resume.
            init_scenes_from_plan(
                self.project,
                self.episode,
                self.plan,
                max_takes=self.max_takes,
                persist=(not dry_run and only_scene_ids is None),
            )

        if force_single_batch and not pass_id:
            raise ValueError(
                "pass_id is required when force_single_batch=True; "
                "without it _scene_from_group falls back to BATCH_001 "
                "and the cross-pass filename collision returns."
            )

        shots = list(canonical_plan.shots)
        if force_new_take and not shots:
            raise RerollPreflightError(
                "reroll_requires_succeeded_primary",
                "--new-take could not resolve any shots for the --new-take target pass.",
            )
        if not shots:
            if not force_new_take:
                ops_log.write({"event": "no_shots_in_plan",
                               "project": self.project, "episode": self.episode})
            if derive_only:
                return {
                    "derive_only": True,
                    "written": [],
                    "skipped": [],
                    "dry_run": dry_run,
                }
            return []

        resolved_grouping = str(grouping or "continuity")
        if force_single_batch:
            resolved_grouping = "coverage"
            if force_new_take and len(shots) > 6:
                raise RerollPreflightError(
                    "new_take_requires_single_r2v_multi_beat",
                    "--new-take requires one r2v_multi batch target.",
                )
            if not selected_coverage_passes:
                selected_coverage_passes = [
                    self._coverage_pass_from_shots(
                        shots,
                        pass_id=pass_id,
                    )
                ]
        if resolved_grouping == "coverage" and selected_coverage_passes:
            selected_coverage_passes = [
                coverage_pass
                if hasattr(coverage_pass, "segments")
                else self._coverage_pass_from_shots(
                    shots,
                    pass_id=getattr(coverage_pass, "pass_id", pass_id),
                )
                for coverage_pass in selected_coverage_passes
            ]
        logger.info(
            "run_episode_batches: grouping=%s shots=%d selected_coverage_passes=%d",
            resolved_grouping,
            len(shots),
            len(selected_coverage_passes),
        )
        ctx = GroupingContext(
            project=self.project,
            episode=int(str(self.episode).removeprefix("ep_")),
            canonical_plan=canonical_plan,
            selected_coverage_passes=selected_coverage_passes,
            tier_map=dict(tier_map or {}),
            wildcard_override=wildcard_override,
        )
        groups = get_grouping(resolved_grouping).assemble(shots, ctx)
        if force_new_take and len(groups) != 1:
            raise RerollPreflightError(
                "new_take_requires_single_r2v_multi_beat",
                "--new-take requires one r2v_multi batch target.",
            )
        if not force_new_take:
            ops_log.write({"event": "batches_planned",
                           "count": len(groups),
                           "episode": self.episode,
                           "grouping": resolved_grouping})
        scenes = []
        rendered_shot_ids: list[str] = []
        guarded_writes = 0
        skipped: list[tuple[str, str | None]] = []
        written_scene_ids: list[str] = []
        candidate_versions: dict[str, int] = {}
        derive_only_candidate_appended = False
        written_scene_payloads_by_id: dict[str, dict] = {}
        scene_script_spans: dict[str, dict[str, str | None]] = {}
        selected_pass_ids = {
            coverage_pass.pass_id for coverage_pass in selected_coverage_passes
        }

        def _record_written_scene(scene: Scene, group: Group) -> None:
            if dry_run:
                return
            written_scene_payloads_by_id[scene.scene_id] = _scene_manifest_payload(scene)
            scene_script_spans[group.scene_id] = {
                s.shot_id: (s.raw or {}).get("source_text_hash")
                for s in group.shots
            }

        def _stamp_scenes_if_complete() -> None:
            if dry_run:
                return
            if derive_only and derive_only_candidate_appended:
                logger.info(
                    "scenes stamp skipped: derive_only staged candidates without "
                    "moving active scene bodies"
                )
                return
            # REC-231 (audit CRITICAL): a non-derive run can STAGE not_derived candidates
            # (coverage-refresh / topology-drift) that append to written_scene_ids and
            # `continue` WITHOUT dispatching. If every group stages, the completeness
            # predicate below would otherwise pass and stamp stages.scenes as `derived`
            # using candidate payloads even though active_version never moved (the CLI
            # reports requires_conform) — corrupting the freshness SSOT. Any staged
            # candidate this run means the episode is NOT cleanly derived: skip the stamp.
            if not derive_only and candidate_versions:
                logger.info(
                    "scenes stamp skipped: %d not_derived candidate(s) staged without "
                    "dispatch — active version unmoved; operator must re-board + conform",
                    len(candidate_versions),
                )
                return
            raw = getattr(canonical_plan, "raw", {}) or {}
            expected_full = raw.get("total_shots") or len(raw.get("shots") or [])
            full_episode_run = (
                bool(expected_full)
                and len(canonical_plan.shots) == expected_full
                and resolved_grouping != "coverage"
                and not selected_coverage_passes
                and not force_new_take
                and only_scene_ids is None
            )
            reasons: list[str] = []
            if not full_episode_run:
                reasons.append("partial/sliced/coverage/locked")
            if skipped:
                reasons.append("locked")
            if len(written_scene_ids) != len(groups):
                reasons.append("partial")
            if not (
                full_episode_run
                and skipped == []
                and len(written_scene_ids) == len(groups)
            ):
                logger.info(
                    "scenes stamp skipped: partial/sliced/coverage/locked (%s)",
                    ", ".join(reasons) or "predicate failed",
                )
                return
            derivation_manifest.stamp_stage(
                self.project,
                int(str(self.episode).removeprefix("ep_")),
                "scenes",
                kind="derived",
                content_sha=content_sha(written_scene_payloads_by_id),
                structural_sha=None,
                source={"plan_structural_sha": plan_structural_sha(canonical_plan.raw)},
                builder="episode_runner.scenes",
                extra={
                    "scene_ids": sorted(written_scene_ids),
                    "shot_script_spans": scene_script_spans,
                },
            )

        for group in groups:
            if only_scene_ids is not None and group.scene_id not in only_scene_ids:
                continue
            # ORDERING: capture fresh BEFORE loading from disk.  If we loaded
            # first and then reassigned `scene = loaded`, the _scene_from_group
            # call below with pass_id would never run and the stale metadata
            # bug would silently return.
            fresh = self._scene_from_group(group)
            # REC-231 Phase 4: the merge loads the active body + its version atomically
            # so the status saves below target the version the run loaded.
            scene = fresh
            loaded = None
            loaded_version = 1
            loaded_legacy_single_r2v = False
            reused_loaded_scene = False
            metadata_refreshes = {}
            coverage_pass_refresh = False
            topology_drifted = False
            registered_body_missing_structural_metadata = False
            try:
                loaded, loaded_version = load_scene_active_with_version(
                    self.project, self.episode, fresh.scene_id
                )
                loaded_has_manifest = load_manifest(self.project, self.episode, fresh.scene_id) is not None
                coverage_pass_refresh = (
                    bool(selected_pass_ids)
                    and not force_new_take
                    and group.identity.strategy == "coverage"
                    and any(
                        beat.beat_id == f"{pass_id}__cov"
                        for pass_id in selected_pass_ids
                        for beat in fresh.beats
                    )
                )
                loaded_ids = [b.beat_id for b in loaded.beats]
                fresh_ids = [b.beat_id for b in fresh.beats]
                loaded_legacy_single_r2v = (
                    force_new_take
                    and len(loaded.beats) == 1
                    and len(fresh.beats) == 1
                    and loaded.beats[0].beat_metadata.get("modality") == "r2v_multi"
                    and fresh.beats[0].beat_metadata.get("modality") == "r2v_multi"
                )
                # REC-231 Phase 7: the lock-as-clobber-skip preflight is RETIRED.
                # `scene.locked` no longer gates re-derivation — a re-derive appends an
                # immutable candidate version (the `derive_only` branch below) and the
                # active/locked body is never overwritten, so a locked batch flows through
                # the same topology logic as any other (lock is inert UX metadata now).
                if loaded_ids == fresh_ids or loaded_legacy_single_r2v or (
                    force_new_take and set(fresh_ids).issubset(set(loaded_ids))
                ):
                    if (
                        force_new_take
                        and loaded_ids != fresh_ids
                        and not loaded_legacy_single_r2v
                    ):
                        extra_r2v_loaded = [
                            b.beat_id for b in loaded.beats
                            if b.beat_id not in fresh_ids
                            and b.beat_metadata.get("modality") == "r2v_multi"
                        ]
                        if extra_r2v_loaded:
                            raise RerollPreflightError(
                                "new_take_requires_single_r2v_multi_beat",
                                "--new-take target scene topology has extra persisted r2v_multi beats.",
                            )
                    scene = loaded                       # reuse lineage/takes
                    reused_loaded_scene = True
                    fb = {b.beat_id: b for b in fresh.beats}
                    if loaded_legacy_single_r2v:
                        fb[loaded.beats[0].beat_id] = fresh.beats[0]
                    for lb in scene.beats:
                        f = fb.get(lb.beat_id)
                        if f:
                            refresh = {
                                k: f.beat_metadata[k]
                                for k in (
                                    "shot",
                                    "batch_shots",
                                    "scene_id",
                                    "grouping",
                                    "generation_config",
                                    "element_config",
                                    "modality",
                                    "batch_summary",
                                    "prompt_directive",
                                )
                                if k in f.beat_metadata
                            }
                            if loaded_has_manifest and any(
                                k not in (lb.beat_metadata or {}) for k in refresh
                            ):
                                registered_body_missing_structural_metadata = True
                            if coverage_pass_refresh:
                                continue
                            elif force_new_take:
                                metadata_refreshes[lb.beat_id] = refresh
                            else:
                                # REC-235 fail-closed guard: `refresh` carries `shot`
                                # wholesale incl. `raw`. A same-topology shot.raw/script-only
                                # content change on a LIVE (dispatching) run would be applied
                                # in-memory and dispatched while Writer 1 ignores `raw`
                                # (canonical_shot_identity) and no candidate is staged — paid
                                # work on content that is never versioned, vs the dedup
                                # contract (raw = a real change that must append a candidate).
                                # BLOCK and require the operator to capture it through
                                # versioning first. (derive_only never dispatches — it appends
                                # a candidate — so it is exempt.) The graceful stage-candidate
                                # UX is deferred to REC-235.
                                if (
                                    not derive_only
                                    and loaded_has_manifest
                                    and not registered_body_missing_structural_metadata
                                    and ((f.beat_metadata or {}).get("shot") or {}).get("raw")
                                    != ((lb.beat_metadata or {}).get("shot") or {}).get("raw")
                                ):
                                    raise SceneContentDriftError(lb.beat_id, fresh.scene_id)
                                if not registered_body_missing_structural_metadata:
                                    lb.beat_metadata.update(refresh)
                else:
                    logger.warning(
                        "Topology drift for %s: disk=%s fresh=%s — "
                        "overwriting stale scene",
                        fresh.scene_id, loaded_ids, fresh_ids,
                    )
                    topology_drifted = True
                    if force_new_take:
                        raise RerollPreflightError(
                            "new_take_requires_single_r2v_multi_beat",
                            "--new-take target scene topology does not match the persisted scene.",
                        )
            except FileNotFoundError:
                if force_new_take:
                    raise RerollPreflightError(
                        "reroll_requires_succeeded_primary",
                        "--new-take requires an existing scene with a succeeded primary take.",
                    )
            if registered_body_missing_structural_metadata:
                if not dry_run:
                    candidate = _sanitized_candidate_scene(fresh)
                    pre_manifest = load_manifest(self.project, self.episode, group.scene_id)
                    pre_known_versions = {
                        int(entry["version"])
                        for entry in (pre_manifest or {}).get("versions", [])
                        if isinstance(entry, dict) and "version" in entry
                    }
                    pre_known_versions.add(loaded_version)
                    new_version = SceneVersionStore(self.project, self.episode).write_scene_candidate(
                        group.scene_id,
                        candidate,
                        dedup_against_versions=True,
                    )
                    if new_version in pre_known_versions:
                        print(
                            "registered-version-metadata: candidate already exists at version "
                            f"v{new_version}; active remains v{loaded_version}. "
                            "Candidate is not_derived — run a board pass then "
                            "rederive --conform to activate."
                        )
                    else:
                        print(
                            "registered-version-metadata: appended candidate "
                            f"v{new_version}; active remains v{loaded_version}. "
                            "Candidate is not_derived — run a board pass then "
                            "rederive --conform to activate."
                        )
                    candidate_versions[group.scene_id] = new_version
                    written_scene_ids.append(group.scene_id)
                    _record_written_scene(candidate, group)
                continue
            if force_new_take and not reused_loaded_scene:
                raise RerollPreflightError(
                    "reroll_requires_succeeded_primary",
                    "--new-take requires an existing scene with a succeeded primary take.",
                )
            reroll_target_id = (
                scene.beats[0].beat_id if loaded_legacy_single_r2v else fresh.beats[0].beat_id
                if (
                    force_new_take
                    and len(fresh.beats) == 1
                )
                else None
            )
            if force_new_take and reroll_target_id is None:
                raise RerollPreflightError(
                    "new_take_requires_single_r2v_multi_beat",
                    "--new-take requires exactly one r2v_multi target beat.",
                )
            preflight_scene = scene
            if force_new_take and metadata_refreshes:
                import copy

                preflight_scene = copy.deepcopy(scene)
                for beat in preflight_scene.beats:
                    refresh = metadata_refreshes.get(beat.beat_id)
                    if refresh:
                        beat.beat_metadata.update(refresh)
            reroll_target = (
                self._preflight_reroll(
                    preflight_scene,
                    target_beat_id=reroll_target_id,
                )
                if force_new_take and reroll_target_id else None
            )
            if reroll_target is not None:
                preflight_target_id = reroll_target.beat_id
                self._preflight_reroll_no_extra_dispatchable_beats(
                    preflight_scene,
                    preflight_target_id,
                )
                for beat in scene.beats:
                    refresh = metadata_refreshes.get(beat.beat_id)
                    if refresh:
                        beat.beat_metadata.update(refresh)
                reroll_target = next(
                    beat for beat in scene.beats
                    if beat.beat_id == preflight_target_id
                )
            if reroll_target is None:
                # Recovery passes run IN-MEMORY first (no saves): their
                # demotions determine the true eligible dispatch set the board
                # gate must scan (a demoted succeeded beat becomes eligible).
                self.recover_stale_takes(scene)
                # Demote any persisted "succeeded" take whose video no longer exists
                # on disk, so a loaded/stale scene re-dispatches instead of falsely
                # reporting a render that isn't there.
                phantoms = self.invalidate_phantom_succeeded_takes(scene)
                if phantoms:
                    logger.warning(
                        "run_episode_batches: %d phantom-succeeded take(s) in scene "
                        "%s had no artifact — will re-dispatch", phantoms, scene.scene_id,
                    )
                # REC-12: revalidate succeeded beats against CURRENT resolved refs.
                # The dispatch-time drift check is dead here (run_scene skips beats
                # with a primary take), so re-resolve and demote any whose refs
                # drifted since success (e.g. hero → composite-sheet promotion).
                drifted = self.revalidate_succeeded_fingerprints(scene)
                if drifted:
                    logger.warning(
                        "run_episode_batches: %d succeeded take(s) in scene %s had "
                        "ref drift — demoted to re-dispatch with updated refs",
                        drifted, scene.scene_id,
                    )
                # Board gate over the ELIGIBLE set (post-demotion accuracy),
                # BEFORE any save: a blocked batch raises here and the loaded
                # scene's in-memory mutations are discarded — disk untouched
                # (Phase 6: zero state writes before the scan passes).
                # D5: the board gate guards PAID dispatch only. A derive_only
                # run never reaches run_scene/dispatch() (see `if derive_only:
                # continue` below), so it must not be blocked from saving the
                # re-derived scene — gate the non-derive path only. Eliminates
                # the RECOIL_BOARD_GATE=0 hack for $0 derives.
                if not derive_only:
                    _preflight_board_gate(
                        project=self.project,
                        episode=self.episode,
                        beats=_dispatchable_beats_for_scene(
                            scene, force_new_take=False, reroll_beat_id=None,
                        ),
                    )
                coverage_candidate_staged = False
                if not dry_run and not derive_only and coverage_pass_refresh:
                    candidate = _sanitized_candidate_scene(fresh)
                    store = SceneVersionStore(self.project, self.episode)
                    pre_manifest = load_manifest(self.project, self.episode, group.scene_id)
                    pre_known_versions = {
                        int(entry["version"])
                        for entry in (pre_manifest or {}).get("versions", [])
                        if isinstance(entry, dict) and "version" in entry
                    }
                    pre_known_versions.add(loaded_version)
                    new_version = store.write_scene_candidate(
                        group.scene_id,
                        candidate,
                        dedup_against_versions=True,
                    )
                    if new_version == loaded_version:
                        print(
                            "coverage-pass: deduped to active — no change; "
                            f"active remains v{loaded_version}"
                        )
                    elif new_version in pre_known_versions:
                        print(
                            "coverage-pass: candidate already exists at version "
                            f"v{new_version}; active remains v{loaded_version}. "
                            "Candidate is not_derived — run a board pass then "
                            "rederive --conform to activate."
                        )
                        coverage_candidate_staged = True
                    else:
                        print(
                            "coverage-pass: appended candidate version "
                            f"v{new_version}; active remains v{loaded_version}. "
                            "Candidate is not_derived — run a board pass then "
                            "rederive --conform to activate."
                        )
                        coverage_candidate_staged = True
                    if coverage_candidate_staged:
                        candidate_versions[group.scene_id] = new_version
                        written_scene_ids.append(group.scene_id)
                        _record_written_scene(candidate, group)
                        continue
                # Gate passed — persist recovery results + metadata refreshes.
                if dry_run:
                    self._persist_active_status(
                        scene, expected_version=loaded_version, dry_run=dry_run
                    )
                    written_scene_ids.append(scene.scene_id)
                elif derive_only:
                    # REC-231 Phase 2: re-derivation is non-destructive. The freshly-
                    # clustered grouping output is a NEW shot structure — append it as an
                    # immutable candidate version (the pointer never moves) instead of
                    # overwriting the active body. The first re-derive of a legacy flat
                    # batch materializes its manifest (flat → v1 active/approved) before
                    # appending the candidate. `group.scene_id` is the PRE-derivation
                    # requested batch id (the selector target), threaded as-is in Phase 2;
                    # Phase 6 adds the identity-halt against the generated scene.scene_id.
                    candidate_version = (
                        SceneVersionStore(self.project, self.episode)
                        .write_scene_candidate(
                            group.scene_id,
                            _sanitized_candidate_scene(scene),
                            dedup_against_versions=dedup_candidate,
                        )
                    )
                    if not (
                        candidate_version == 1
                        and load_manifest(self.project, self.episode, group.scene_id) is None
                    ):
                        derive_only_candidate_appended = True
                    candidate_versions[scene.scene_id] = candidate_version
                    written_scene_ids.append(scene.scene_id)
                    _record_written_scene(scene, group)
                else:
                    # REC-231 Phase 7: a locked batch is no longer special-cased here
                    # (the force/lock clobber gate + .pre-force-*.bak overwrite is DELETED).
                    # REC-231 Phase 4: a non-derive generation pass persists recovery
                    # results + metadata refreshes as STATUS onto the ACTIVE version
                    # body via the structure-guarded writer (a defensive structural
                    # delta on a VERSIONED batch raises SceneStructureImmutableError;
                    # for an un-versioned flat batch the write is byte-identical to the
                    # pre-versioning raw save). This closes the "normal generation pass
                    # mutates the active body via raw save" gap.
                    if topology_drifted:
                        candidate = _sanitized_candidate_scene(fresh)
                        new_version = SceneVersionStore(
                            self.project, self.episode
                        ).write_scene_candidate(
                            group.scene_id,
                            candidate,
                            dedup_against_versions=True,
                        )
                        print(
                            "topology-drift: appended candidate "
                            f"v{new_version}; active remains v{loaded_version}; "
                            "not_derived — re-board then rederive --conform"
                        )
                        candidate_versions[group.scene_id] = new_version
                        written_scene_ids.append(group.scene_id)
                        _record_written_scene(candidate, group)
                        continue
                    self._persist_active_status(
                        scene, expected_version=loaded_version, dry_run=dry_run
                    )
                    guarded_writes += 1
                    written_scene_ids.append(scene.scene_id)
                    _record_written_scene(scene, group)
            else:
                # Reroll path: the CLI/API entry points pre-scan before their
                # mutating prepare call; this covers direct callers.
                _preflight_board_gate(
                    project=self.project,
                    episode=self.episode,
                    beats=[reroll_target],
                )
            if derive_only:
                continue
            # Per-beat render-attempt snapshot BEFORE run_scene. A beat is
            # "rendered THIS run" iff its non-build-error take count GROWS
            # during this run_scene call — the only signal that distinguishes
            # a current-run render from a stale prior-run take already on the
            # beat. id(b) keys the snapshot to the live Beat object (the same
            # instances are mutated in place by run_scene; scene.beats is
            # stable across the call).
            _pre = {
                id(b): _beat_render_attempt_take_count(b) for b in scene.beats
            }

            def _stamp_rendered_beats():
                for b in scene.beats:
                    if _beat_render_attempt_take_count(b) > _pre.get(id(b), 0):
                        rendered_shot_ids.extend(_beat_dailies_shot_ids(b))

            try:
                await self.run_scene(
                    scene,
                    dry_run=dry_run,
                    force_new_take=force_new_take and reroll_target is not None,
                    seed=seed,
                    make_primary=make_primary,
                    reroll_beat_id=reroll_target.beat_id if reroll_target else None,
                    strategy_override=strategy_override,
                    expected_version=loaded_version,
                )
            except BudgetExhaustedError as _budget_exc:
                self._persist_active_status(
                    scene, expected_version=loaded_version, dry_run=dry_run
                )
                # Stamp the shots whose beat actually rendered before the halt.
                # A per-beat budget-preflight halt (1725/1734) fires BEFORE
                # new_take (1821); a build-error stub (1780) dispatches nothing
                # — both leave the non-build-error delta at 0, so they are NOT
                # stamped. A partial multi-beat i2v scene stamps ONLY the beats
                # that rendered, never a cancelled sibling.
                _stamp_rendered_beats()
                _budget_exc.rendered_shot_ids = list(rendered_shot_ids)
                raise
            # Completed scene: stamp the shots whose beat rendered this run.
            # run_scene can return having dispatched nothing (empty
            # eligible_beats at 2152, or only build-error stubs at 1780) — the
            # per-beat delta is 0 for those beats, so they are NOT stamped.
            _stamp_rendered_beats()
            # REC-231 Phase 4: the post-run completion save is STATUS-ONLY (take
            # results / run state) on the ACTIVE body — a structural delta here is a
            # hard invariant error (SceneStructureImmutableError), never a silent append.
            self._persist_active_status(
                scene, expected_version=loaded_version, dry_run=dry_run
            )
            _record_written_scene(scene, group)
            scenes.append(scene)
            if force_new_take and dry_run:
                continue
            ops_log.write({"event": "batch_complete",
                           "batch_id": group.scene_id,
                           "shot_count": len(group.shots),
                           "grouping": group.identity.strategy,
                           "below_threshold": group.modality == "video_i2v"})
        if not dry_run and not force_new_take:
            skipped_ids = ",".join(scene_id for scene_id, _reason in skipped)
            print(
                f"scenes: wrote {guarded_writes} "
                "(scenes actually persisted by the guarded writer this run), "
                f"SKIPPED {len(skipped)} locked ({skipped_ids})"
            )
        _stamp_scenes_if_complete()
        if derive_only:
            summary = {
                "derive_only": True,
                "written": list(written_scene_ids),
                "skipped": [
                    {"scene_id": sid, "lock_reason": reason}
                    for sid, reason in skipped
                ],
                "dry_run": dry_run,
            }
            if candidate_versions:
                summary["candidate_versions"] = dict(candidate_versions)
            return summary
        # REC-231: the non-derive coverage-refresh / topology-drift branches APPEND a
        # not_derived candidate and `continue` WITHOUT dispatching — those batches are
        # never added to `scenes`. Expose them so the CLI does not report a no-dispatch
        # staging run as a clean success (the operator must re-board then rederive
        # --conform). `candidate_versions` on this (non-derive) path is populated ONLY by
        # those two staging branches.
        self._staged_candidates_without_dispatch = dict(candidate_versions)
        return scenes

    @staticmethod
    def _coverage_pass_from_shots(
        shots,
        *,
        pass_id: str | None,
    ) -> CoveragePass:
        from recoil.pipeline.orchestrator.coverage_planner import (
            CoveragePass,
            CoverageSegment,
        )

        if not pass_id:
            raise ValueError("pass_id is required for synthetic coverage grouping.")
        segments = [
            CoverageSegment(
                segment_index=index,
                source_shot_id=shot.shot_id,
                shot_type=shot.shot_type or "MS",
                duration_s=int(shot.duration_s or 5),
                prompt=str((shot.raw or {}).get("description") or ""),
            )
            for index, shot in enumerate(shots)
        ]
        first = shots[0].shot_id if shots else ""
        last = shots[-1].shot_id if shots else ""
        return CoveragePass(
            pass_id=pass_id,
            episode_id="",
            shot_range=(first, last),
            camera_side="",
            label=pass_id,
            focus_character="",
            pass_type="env",
            location_id=getattr(shots[0], "location_id", "") if shots else "",
            segments=segments,
        )

    @staticmethod
    def _coverage_ordinal_from_pass_id(pass_id: str, *, fallback: int) -> int:
        match = re.search(r"(?:^|_)(?:PASS|COV)_(\d{1,3})(?:_|$)", pass_id)
        if match:
            return int(match.group(1))
        return fallback

    def _group_from_batch(
        self,
        batch,
        *,
        pass_id: str | None = None,
        fallback_ordinal: int = 1,
    ) -> Group:
        strategy = "coverage" if pass_id else "continuity"
        identity = GroupingIdentity(
            strategy=strategy,
            ordinal=(
                self._coverage_ordinal_from_pass_id(
                    pass_id,
                    fallback=fallback_ordinal,
                )
                if pass_id else fallback_ordinal
            ),
            shot_ids=[shot.shot_id for shot in batch.shots],
            source_pass_id=pass_id,
        )
        return Group(
            identity=identity,
            shots=list(batch.shots),
            scene_id=pass_id or batch.batch_id,
            modality="video_i2v" if batch.below_threshold else "r2v_multi",
            generation_config={},
            element_config={
                "batch_summary": {
                    "batch_id": batch.batch_id,
                    "shot_ids": [s.shot_id for s in batch.shots],
                    "shared_location_id": batch.shared_location_id,
                    "shared_characters": batch.shared_characters,
                    "total_duration_s": batch.total_duration_s,
                }
            },
        )

    def _scene_from_batch(self, batch, pass_id: str | None = None) -> "Scene":
        return self._scene_from_group(
            self._group_from_batch(batch, pass_id=pass_id, fallback_ordinal=1)
        )

    def _scene_from_group(self, group: Group) -> "Scene":
        """Convert a Group of CanonicalShots into a Scene of Beats.

        CanonicalShot objects are stored as dicts (dataclasses.asdict) in
        beat_metadata for JSON-serialization compat with save_scene.
        _build_workflow_for_beat reconstructs CanonicalShot(**d) on the way out.

        Coverage groups keep their source pass id as the scene namespace.
        Continuity groups keep their batch id. Every beat receives the honest
        grouping identity used later by the filename writer.
        """
        scene_id = group.scene_id
        grouping = group.identity.to_dict()
        batch_summary = dict(group.element_config.get("batch_summary") or {})
        batch_summary.setdefault("batch_id", scene_id)
        batch_summary.setdefault("shot_ids", [s.shot_id for s in group.shots])
        batch_summary.setdefault(
            "shared_location_id",
            getattr(group.shots[0], "location_id", None) if group.shots else None,
        )
        batch_summary.setdefault(
            "shared_characters",
            sorted({
                c.char_id if hasattr(c, "char_id") else str(c)
                for shot in group.shots
                for c in (shot.characters or [])
            }),
        )
        batch_summary.setdefault(
            "total_duration_s",
            sum(float(s.duration_s or 0.0) for s in group.shots),
        )

        def _metadata(base: dict, beat_grouping: dict | None = None) -> dict:
            base["grouping"] = dict(beat_grouping or grouping)
            base["generation_config"] = dict(group.generation_config or {})
            base["element_config"] = dict(group.element_config or {})
            if group.prompt_directive:
                base["prompt_directive"] = group.prompt_directive
            return base

        split_multi_shot_group = (
            group.modality == "video_i2v" and len(group.shots) > 1
        )
        if group.modality == "video_i2v":
            beats = [
                Beat(
                    beat_id=(
                        f"{scene_id}__{s.shot_id}"
                        if group.identity.source_pass_id else s.shot_id
                    ),
                    max_takes=self.max_takes,
                    beat_metadata=_metadata({
                        "scene_id": scene_id,
                        "shot": dataclasses.asdict(s),
                        "modality": "video_i2v",
                        "batch_summary": {
                            "batch_id": scene_id,
                            "shot_ids": [s.shot_id],
                        },
                    }, GroupingIdentity(
                        strategy="solo",
                        ordinal=0,
                        shot_ids=[s.shot_id],
                        source_pass_id=None,
                    ).to_dict() if split_multi_shot_group else None),
                )
                for s in group.shots
            ]
        else:
            beats = [
                Beat(
                    beat_id=(
                        f"{scene_id}__cov"
                        if group.identity.source_pass_id else scene_id
                    ),
                    max_takes=self.max_takes,
                    beat_metadata=_metadata({
                        "scene_id": scene_id,
                        "shot": dataclasses.asdict(group.shots[0]),
                        "batch_shots": [dataclasses.asdict(s) for s in group.shots],
                        "modality": "r2v_multi",
                        "batch_summary": batch_summary,
                    }),
                ),
            ]
        return Scene(
            scene_id=scene_id,
            beats=beats,
            scene_metadata={"episode": self.episode, "project": self.project,
                            "batch": True, "grouping": dict(grouping)},
        )

    async def run_episode(
        self, sequence_filter: Optional[list[str]] = None
    ) -> list[Scene]:
        """Top-level. Iterates plan sequences (filtered if provided).

        Loads existing Scene JSONs from disk to preserve prior takes;
        recovers any stale `running` takes before dispatching.
        """
        init_scenes_from_plan(
            self.project,
            self.episode,
            self.plan,
            max_takes=self.max_takes,
        )

        sequences = sequence_filter or list((self.plan.get("sequences") or {}).keys())
        results: list[Scene] = []
        for seq_id in sequences:
            # REC-231 Phase 4: load the ACTIVE version body + its version via the
            # pointer, then thread the version into run_scene's status saves.
            try:
                scene, expected_version = load_scene_active_with_version(
                    self.project, self.episode, seq_id
                )
            except FileNotFoundError:
                scene, expected_version = self.init_scene(seq_id), 1
            self.recover_stale_takes(scene)
            await self.run_scene(scene, expected_version=expected_version)
            results.append(scene)
        return results
