#!/usr/bin/env python3
"""CLI entry point for Recoil video generation via coverage passes."""

import argparse
import asyncio
import errno
import hashlib
import json
import logging
import os
import signal
import sys
import tempfile
from pathlib import Path
from types import SimpleNamespace

# ── sys.path bootstrap ───────────────────────────────────────────
# CP-1 (2026-06-01): centralized through core.paths. The
# `core` package is installed via pyproject.toml, so the
# bootstrap import below works whether this CLI is invoked
# inside a `pip install -e .` venv or directly via path.
_HERE = Path(__file__).resolve()
_RECOIL_ROOT = _HERE.parent.parent.parent      # .../recoil/
_REPO_ROOT = _RECOIL_ROOT.parent
if str(_REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(_REPO_ROOT))        # bootstrap so `recoil.*` resolves pre-install
if str(_RECOIL_ROOT) not in sys.path:
    sys.path.insert(0, str(_RECOIL_ROOT))      # bootstrap so `core` resolves pre-install

from recoil.core.paths import (  # noqa: E402
    ensure_pipeline_importable,
    projects_root as resolve_projects_root,
    ProjectPaths as CoreProjectPaths,
)
ensure_pipeline_importable()                     # injects recoil/pipeline/ onto sys.path

# ── Engine imports (AFTER sys.path bootstrap) ────────────────────
from orchestrator.coverage_validator import validate_all_passes, Severity  # noqa: E402
from orchestrator.coverage_planner import CoveragePass  # noqa: E402
from recoil.execution.step_types import ProjectPaths  # noqa: E402
from recoil.execution.pass_store import PassStore  # noqa: E402
from recoil.execution.execution_store import ExecutionStore  # noqa: E402
from recoil.execution.step_runner import StepRunner  # noqa: E402
from recoil.core.model_profiles import get_provider_cost_per_second  # noqa: E402
from recoil.pipeline._lib.author_strategies import AUTHOR_STRATEGIES  # noqa: E402
from recoil.pipeline._lib.board_builder import (  # noqa: E402
    BoardBuilderError,
    board_record_to_cache,
    build_and_dispatch_board,
    build_with_auto_reroll,
    compute_source_sha256,
    render_board_finish,
)
from recoil.pipeline._lib.plan_loader import load_plan  # noqa: E402
from recoil.pipeline._lib import derivation_manifest  # noqa: E402
from recoil.pipeline._lib.derivation_sha import (  # noqa: E402
    board_content_freshness_sha,
    plan_structural_sha,
    shotset_hash,
)
from recoil.pipeline._lib.story_gate import append_label, run_calibration  # noqa: E402
from recoil.pipeline._lib.breakdown_extract import BreakdownExtractError  # noqa: E402
from recoil.pipeline.core.persistence import (  # noqa: E402
    active_scene_body_path,
    load_scene_active,
    load_scene_active_with_version,
    save_active_scene_status,
    SceneIdentityMismatchError,
    SceneVersionConflictError,
)
from recoil.pipeline.core.take import Scene  # noqa: E402  (REC-231 Phase 4 status closures)
from recoil.pipeline.core.receipts import utc_now_iso8601  # noqa: E402
from recoil.pipeline.orchestrator.episode_runner import (  # noqa: E402
    BoardGateError,
    BudgetExhaustedError,
    EpisodeRunner,
    RerollPreflightError,
    SceneContentDriftError,
    _preflight_board_gate,
)
from recoil.pipeline.orchestrator.ingest_pipeline import (  # noqa: E402
    IngestPipeline,
    LocationUnresolvedError,
)
from recoil.pipeline.orchestrator.batch_selector import parse_batch_selector  # noqa: E402
from recoil.pipeline.orchestrator.from_script_target import (  # noqa: E402
    resolve_from_script_target,
)
from recoil.pipeline.orchestrator.learning_engine import LearningEngine  # noqa: E402
from recoil.pipeline.orchestrator.strategy_registry import StrategyEngine  # noqa: E402
from recoil.pipeline.tools.breakdown_gate_cli import _run_gate  # noqa: E402

# ── Constants ────────────────────────────────────────────────────
LOCKFILE_SUFFIX = "_pass_state.lock"
EXIT_OK = 0
EXIT_PARTIAL = 1
EXIT_VALIDATION = 2
EXIT_LOCKED = 3
EXIT_STORY_GATE_EVAL = 4
EXIT_SIGNALLED = 130

logger = logging.getLogger("recoil.generate")


def _json_line(payload: dict) -> str:
    return json.dumps(payload, default=str)


def _sha256_file(path: Path) -> str:
    if not path.exists():
        return ""
    return hashlib.sha256(path.read_bytes()).hexdigest()


def _current_operator() -> str:
    return (
        os.environ.get("RECOIL_LOCKED_BY")
        or os.environ.get("RECOIL_USER")
        or os.environ.get("USER")
        or "JT"
    )


def _shotset_hash_from_beat(beat) -> str | None:
    grouping = (beat.beat_metadata or {}).get("grouping") or {}
    if not isinstance(grouping, dict):
        return None
    shotset_hash_val = grouping.get("shotset_hash")
    if not shotset_hash_val:
        shot_ids = grouping.get("shot_ids") or []
        if shot_ids:
            shotset_hash_val = shotset_hash(shot_ids)
    return shotset_hash_val


def _require_board_shotset_hash(beat) -> str:
    grouping = _require_board_grouping_metadata(beat)
    return grouping.get("shotset_hash") or shotset_hash(grouping.get("shot_ids") or [])


def _require_board_grouping_metadata(beat) -> dict:
    grouping = (beat.beat_metadata or {}).get("grouping")
    if not isinstance(grouping, dict) or not grouping:
        raise RuntimeError(
            f"L2: board decision on a beat with no derivable shotset_hash "
            f"(L1 invariant violated): beat={getattr(beat, 'beat_id', '?')}"
        )
    shot_ids = grouping.get("shot_ids") or []
    stamped_hash = grouping.get("shotset_hash")
    if not stamped_hash and not shot_ids:
        raise RuntimeError(
            f"L2: board decision on a beat with no derivable shotset_hash "
            f"(L1 invariant violated): beat={getattr(beat, 'beat_id', '?')}"
        )
    if stamped_hash and shot_ids and stamped_hash != shotset_hash(shot_ids):
        raise RuntimeError(
            f"L2: board decision shotset_hash mismatch "
            f"(L1 invariant violated): beat={getattr(beat, 'beat_id', '?')} "
            f"stamped={stamped_hash} computed={shotset_hash(shot_ids)}"
        )
    return grouping


def _board_script_span_provenance(
    project: str,
    episode: int,
    covered_shot_ids: list[str],
) -> tuple[dict[str, str | None], str | None]:
    spans = {shot_id: None for shot_id in covered_shot_ids}
    if not covered_shot_ids:
        return spans, None
    try:
        plan_path = (
            CoreProjectPaths.for_project(project).plans_dir
            / f"ep_{episode:03d}_plan.json"
        )
        plan = load_plan(plan_path)
        live_spans = {
            shot.shot_id: (shot.raw or {}).get("source_text_hash")
            for shot in plan.shots
        }
    except Exception as exc:  # noqa: BLE001 - approval must degrade, not fail.
        logger.warning(
            "board provenance unavailable for %s ep_%03d: %s",
            project,
            episode,
            exc,
        )
        return spans, None

    unresolved: list[str] = []
    for shot_id in covered_shot_ids:
        if shot_id not in live_spans or live_spans[shot_id] is None:
            unresolved.append(shot_id)
            continue
        spans[shot_id] = live_spans[shot_id]
    if unresolved:
        logger.warning(
            "board provenance unresolved shots for %s ep_%03d: %s",
            project,
            episode,
            ",".join(unresolved),
        )
    if not any(value is not None for value in spans.values()):
        return spans, None
    return spans, board_content_freshness_sha(spans)


def _scene_id_from_batch_for_lock(batch: str, episode: int) -> tuple[str | None, dict | None]:
    selector = parse_batch_selector(batch)
    if selector is not None:
        if selector.episode != episode:
            return None, {
                "success": False,
                "error": "invalid_batch_selector",
                "message": (
                    f"selector episode EP{selector.episode:03d} does not match "
                    f"--episode {episode}."
                ),
            }
        return selector.scene_id, None
    if batch.startswith(("BATCH_", "ONER_")):
        return batch, None
    return None, {
        "success": False,
        "error": "invalid_batch_selector",
        "message": (
            f"{batch!r} is not a valid batch selector or persisted scene id "
            "(expected EP###_CONT_###, EP###_ONER_###, BATCH_###, or ONER_###)."
        ),
    }


def _build_step_runner_for_episode(project: str, episode: int) -> StepRunner:
    paths = ProjectPaths.for_episode(project, episode)
    store = ExecutionStore(project, migrate=False)
    return StepRunner(store=store, paths=paths, episode=episode)


def _stamp_board_ssot(
    project: str, episode: int, beat, *, scene_version: int | None = None
) -> None:
    """D2/L2: mirror a board decision into manifest.execution.boards (the
    SSOT), keyed by shotset_hash, after the Beat.board cache save succeeds.
    FAIL LOUD if a board-bearing beat has no derivable shotset_hash — L1 guarantees
    one on every board-bearing unit, so absence is an integrity violation, NOT a
    skip."""
    grouping = (beat.beat_metadata or {}).get("grouping", {})
    shotset_hash_val = _require_board_shotset_hash(beat)
    board = beat.board or {}
    # CRITICAL (R2): only record photoreal_artifact if the file actually exists
    # on disk. The stale-photoreal-pop retry (:956-957) pops a missing-file
    # photoreal from the cache before re-finishing; recording the stale path
    # here would poison L4 if the finish retry then fails. Resolve the path the
    # SAME way the live approve block does (generate.py:948-955): expanduser,
    # and project-root-relative when not absolute.
    photoreal = board.get("photoreal_artifact")
    photoreal_ok = False
    if isinstance(photoreal, str) and photoreal:
        proot = CoreProjectPaths.for_project(project).project_root
        p = Path(photoreal).expanduser()
        if not p.is_absolute():
            p = proot / p
        photoreal_ok = p.is_file()
    manifest = derivation_manifest.load(project, episode)
    covered_shot_ids = list(grouping.get("shot_ids", []))
    shot_script_spans, content_freshness_sha = _board_script_span_provenance(
        project,
        episode,
        covered_shot_ids,
    )
    record = {
        "status": board.get("status"),                          # "approved" | "rejected"
        "artifact": board.get("artifact"),
        "photoreal_artifact": photoreal if photoreal_ok else None,  # validated-exists; None until finish (stamp-site 2)
        "source_sha256": board.get("source_sha256"),
        "fingerprint_version": board.get("fingerprint_version", 1),
        "model": board.get("model"),
        "provider": board.get("provider"),
        "fallback_from": board.get("fallback_from"),
        # forward provenance for the deferred rederive reconciliation pass; no live reader yet.
        "plan_structural_sha_at_approval": (
            manifest.get("stages", {}).get("plan", {}).get("structural_sha")
        ),
        "covered_shot_ids": covered_shot_ids,
        "shot_script_spans": shot_script_spans,
        "content_freshness_sha": content_freshness_sha,
        "approved_by": board.get("approved_by"),
        "updated_at": board.get("updated_at"),
        # REC-231: the active scene version this board was approved against. boards[] is
        # keyed by shotset_hash (shot-id SET), which sibling versions differing only in
        # raw/description SHARE; the spend resolver uses this to refuse a board approved
        # against a different active version (else a revert could dispatch a paid render
        # with another version's board).
        "scene_version": scene_version,
    }
    # REC-240: carry the composition_ref seam into the SSOT record (the spend
    # resolver reads the SSOT record, not Beat.board, for grouped board-bearing
    # beats). Only when present -> an absent seam omits the key, keeping legacy
    # records byte-identical; a present seam survives to resolve_board_for_spend
    # so the manifest-view indirection actually fires on the paid path.
    if board.get("composition_ref") is not None:
        record["composition_ref"] = board["composition_ref"]
    # Board approval is single-writer/per-episode-serial (same contract as
    # stamp_board), so the manifest loaded microseconds earlier in this
    # synchronous call cannot clobber a concurrent writer; there is none.
    derivation_manifest.stamp_board(
        project,
        episode,
        shotset_hash_val,
        record,
        manifest=manifest,
    )


def _single_r2v_board_beat(scene: Scene):
    beats = [
        beat for beat in scene.beats
        if (beat.beat_metadata or {}).get("modality") == "r2v_multi"
    ]
    if len(beats) != 1:
        raise RuntimeError("board SSOT stamp requires exactly one r2v_multi target beat")
    return beats[0]


# ── Lock management ───────────────────────────────────────────────

def acquire_episode_lock(project_root: Path, episode: int) -> Path:
    """Acquire an exclusive lockfile for the given episode.

    Returns the lock path on success. Raises RuntimeError if a live
    process already holds the lock. The caller owns cleanup via
    release_episode_lock() in a finally block.
    """
    lock_path = (
        CoreProjectPaths.from_root(project_root).passes_dir
        / f"ep_{episode:03d}{LOCKFILE_SUFFIX}"
    )
    lock_path.parent.mkdir(parents=True, exist_ok=True)

    if lock_path.exists():
        raw = lock_path.read_text(encoding="utf-8").strip()
        if not raw:
            # Empty lockfile — stale; delete and fall through
            logger.warning("Empty lockfile found; auto-cleaning %s", lock_path)
            lock_path.unlink(missing_ok=True)
        else:
            try:
                pid = int(raw)
            except ValueError:
                # Non-integer content — treat as stale
                logger.warning("Non-integer lockfile content; auto-cleaning %s", lock_path)
                lock_path.unlink(missing_ok=True)
                pid = None

            if pid is not None:
                try:
                    os.kill(pid, 0)
                except ProcessLookupError:
                    # Dead process
                    logger.warning("stale lockfile pid %d; auto-cleaning", pid)
                    lock_path.unlink(missing_ok=True)
                except OSError as exc:
                    if exc.errno == errno.ESRCH:
                        # Dead process (POSIX variant)
                        logger.warning("stale lockfile pid %d; auto-cleaning", pid)
                        lock_path.unlink(missing_ok=True)
                    elif exc.errno == errno.EPERM:
                        # Process exists, foreign-owned — treat as LIVE
                        raise RuntimeError(
                            f"Generation already running for episode {episode} "
                            f"(pid={pid}, foreign-owned)"
                        )
                    else:
                        # Unknown OS error — err on the side of safety
                        raise RuntimeError(
                            f"Generation already running for episode {episode} "
                            f"(pid={pid})"
                        )
                else:
                    # No exception — process is alive
                    raise RuntimeError(
                        f"Generation already running for episode {episode} (pid={pid})"
                    )

    fd, tmp_path = tempfile.mkstemp(
        dir=str(lock_path.parent), prefix=".lock_", suffix=".tmp"
    )
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(str(os.getpid()))
        os.replace(tmp_path, str(lock_path))
    except Exception:
        try:
            os.unlink(tmp_path)
        except OSError:
            pass
        raise

    logger.debug("Acquired lockfile %s (pid=%d)", lock_path, os.getpid())
    return lock_path


def release_episode_lock(lock_path: Path) -> None:
    """Release (delete) the episode lockfile. Safe to call if already gone."""
    lock_path.unlink(missing_ok=True)
    logger.debug("Released lockfile %s", lock_path)


def _remove_transient_empty_lock_parent(lock_parent: Path | None, *, created: bool) -> None:
    if lock_parent is None or not created:
        return
    try:
        lock_parent.rmdir()
    except FileNotFoundError:
        pass
    except OSError:
        pass


def _load_selected_canonical_plan(
    *,
    paths: ProjectPaths,
    episode_str: str,
    pass_ids: list[str] | None,
    selected_dicts: list[dict],
    force_new_take: bool = False,
):
    """Load the episode plan and narrow it to the selected coverage pass shots."""
    plan_path = paths.plans_dir / f"{episode_str}_plan.json"
    canonical_plan = load_plan(plan_path)

    if pass_ids:
        selected_shot_ids: set[str] = set()
        for d in selected_dicts:
            for seg in d.get("segments", []) or []:
                sid = seg.get("source_shot_id")
                if sid:
                    selected_shot_ids.add(str(sid))
            # shot_range fallback when segments are absent — keep the
            # endpoints; the canonical plan filters by exact match.
            shot_range = d.get("shot_range") or []
            for sid in shot_range:
                if sid:
                    selected_shot_ids.add(str(sid))
        if force_new_take and not selected_shot_ids:
            raise RerollPreflightError(
                "reroll_requires_succeeded_primary",
                "--new-take could not resolve any shots for the --new-take target pass.",
            )
        if selected_shot_ids:
            canonical_plan.shots = [
                s for s in canonical_plan.shots
                if s.shot_id in selected_shot_ids
            ]
            if force_new_take and not canonical_plan.shots:
                raise RerollPreflightError(
                    "reroll_requires_succeeded_primary",
                    "--new-take could not resolve any shots for the --new-take target pass.",
                )
        # Canonical-plan shots are under-populated (location_id/characters are
        # often None); the coverage pass's element_config carries the real
        # character + location used for ref resolution. Inject it so r2v_multi
        # ships the character/location refs instead of dispatching prompt-only
        # (the "zero reference_images" path). Only fills gaps — never overwrites.
        _ec_by_shot: dict = {}
        for d in selected_dicts:
            ec = d.get("element_config") or {}
            loc = ec.get("location_id")
            chars = [
                c.get("char_id")
                for c in (ec.get("character_elements") or [])
                if isinstance(c, dict) and c.get("char_id")
            ]
            for seg in d.get("segments", []) or []:
                sid = seg.get("source_shot_id")
                if sid:
                    _ec_by_shot[str(sid)] = (loc, chars)
        for s in canonical_plan.shots:
            ec_pair = _ec_by_shot.get(s.shot_id)
            if not ec_pair:
                continue
            loc, chars = ec_pair
            if loc and not s.location_id:
                s.location_id = loc
            if chars and not s.characters:
                s.characters = list(chars)

    return canonical_plan


def _resolve_grouping(requested_grouping: str, pass_ids: list[str] | None) -> str:
    if requested_grouping == "auto":
        return "coverage" if pass_ids is not None else "continuity"
    return requested_grouping


def _load_coverage_pass_dicts(paths: ProjectPaths, episode: int) -> list[dict]:
    passes_file = paths.coverage_passes_dir / f"ep_{episode:03d}_passes.json"
    raw = json.loads(passes_file.read_text(encoding="utf-8"))
    return raw if isinstance(raw, list) else raw.get("passes", [])


def _coverage_passes_file_error(paths: ProjectPaths, episode: int, exc: Exception) -> dict:
    passes_file = paths.coverage_passes_dir / f"ep_{episode:03d}_passes.json"
    if isinstance(exc, FileNotFoundError):
        return {
            "success": False,
            "error": "passes_file_missing",
            "path": str(passes_file),
        }
    return {"success": False, "error": "passes_file_read_error", "message": str(exc)}


def _coverage_passes_staleness_error(
    paths: ProjectPaths, project: str, episode: int
) -> dict | None:
    """D3 consumer staleness guard (REC-164 Phase 4).

    Compare the current plan's structural_sha against the plan_structural_sha the
    coverage_passes producer recorded in the derivation manifest. Return a
    structured ``coverage_passes_stale`` error dict when the locked passes were
    built against a different (or no) plan structural_sha; return None when fresh.

    REFUSES ONLY — never rebuilds (that is D3's downstream half). Reads the
    manifest stamped by Phase 3 + the live plan; writes nothing. Mirrors the
    producer's plan load (``json.loads(plan_path.read_text(...))``) so the two
    structural shas compare byte-for-byte. A missing ``coverage_passes`` stage or
    Refuses ONLY on DRIFT (recorded present but != current plan structural_sha).
    ABSENT provenance (no coverage_passes stage / None recorded — legacy passes
    that predate the manifest, or flows that don't populate it like reroll /
    direct pass-scoped runs) WARNS and proceeds: the guard protects against the
    "old script" drift case, not absence. Once a rederive stamps the producer
    (Phase 3), a real mismatch is caught. (REC-164 follow-up: absent→permissive,
    so the guard is non-breaking for existing flows while still drift-protective.)
    """
    plan_path = paths.plans_dir / f"ep_{episode:03d}_plan.json"
    try:
        plan_dict = json.loads(plan_path.read_text(encoding="utf-8"))
    except FileNotFoundError:
        # No plan on disk (reroll / direct pass-scoped flows that don't derive a
        # plan) → nothing to compare the passes against; the guard does not apply.
        return None
    current_plan_structural = plan_structural_sha(plan_dict)

    manifest = derivation_manifest.load(project, episode)
    recorded = (
        manifest["stages"]
        .get("coverage_passes", {})
        .get("source", {})
        .get("plan_structural_sha")
    )
    if recorded is None:
        logger.warning(
            "coverage_passes for ep_%03d have no recorded plan_structural_sha "
            "(legacy/un-stamped); proceeding without staleness check — re-lock via "
            "rederive to enable drift protection.",
            episode,
        )
        return None
    if recorded != current_plan_structural:
        return {
            "success": False,
            "error": "coverage_passes_stale",
            "message": (
                f"coverage_passes for ep_{episode:03d} were built against plan "
                f"structural_sha {recorded} but the current plan is "
                f"{current_plan_structural}. Re-run: rederive --project {project} "
                f"--episode {episode} (rebuilds passes, $0)."
            ),
            "recorded": recorded,
            "current": current_plan_structural,
        }
    return None


# ── Batch reroll (REC-111) ────────────────────────────────────────

def _run_batch_reroll(
    *,
    project: str,
    episode: int,
    batch: str,
    strategy: str | None,
    seed: int | None,
    make_primary: bool,
    budget_usd: float,
    dry_run: bool,
) -> dict:
    """Reroll a single continuity/oner batch via its persisted scene (REC-111).

    BYPASSES coverage-pass loading + plan reassembly entirely: maps the --batch
    selector to the persisted scene file, cross-checks grouping metadata, clears
    any stale primary (REC-120), then dispatches one fresh take through
    ``run_scene`` — the same scene-level entry the episode path drives per-scene
    (NOT ``run_episode_batches``).
    """
    from recoil.pipeline.orchestrator.batch_selector import verify_scene_grouping_metadata

    selector = parse_batch_selector(batch)
    if selector is None:
        return {
            "success": False,
            "error": "invalid_batch_selector",
            "message": (
                f"--batch {batch!r} is not a valid selector "
                "(expected EP###_CONT_### or EP###_ONER_###)."
            ),
        }
    if selector.episode != episode:
        return {
            "success": False,
            "error": "invalid_batch_selector",
            "message": (
                f"--batch episode EP{selector.episode:03d} does not match "
                f"--episode {episode}."
            ),
        }

    episode_str = f"ep_{episode:03d}"
    # REC-231 Phase 4: a CLI batch-reroll reads the ACTIVE version body via the
    # pointer (so a reroll after conform(v2) rerolls v2, not the flat/v1), capturing
    # the version atomically so the downstream status writes target that version.
    path = active_scene_body_path(project, episode_str, selector.scene_id)
    try:
        scene, expected_version = load_scene_active_with_version(
            project, episode_str, selector.scene_id
        )
    except FileNotFoundError:
        return {
            "success": False,
            "error": "batch_scene_missing",
            "message": f"No persisted scene for --batch {batch}.",
            "path": str(path),
        }

    if len(scene.beats) != 1:
        return {
            "success": False,
            "error": "new_take_requires_single_r2v_multi_beat",
            "message": "--batch requires the persisted scene to contain exactly "
                       "one beat (fail-before-mutation).",
        }
    r2v_beats = [
        beat for beat in scene.beats
        if beat.beat_metadata.get("modality") == "r2v_multi"
    ]
    if len(r2v_beats) != 1:
        return {
            "success": False,
            "error": "new_take_requires_single_r2v_multi_beat",
            "message": "--batch requires exactly one r2v_multi target beat.",
        }
    beat = r2v_beats[0]

    try:
        verify_scene_grouping_metadata(scene, selector, beat)
    except BoardGateError as exc:
        return {
            "success": False,
            "error": "board_gate_blocked",
            "reason": exc.reason,
            "message": str(exc),
            "beat_id": exc.beat_id,
        }
    except SceneVersionConflictError as exc:
        return {
            "success": False,
            "error": "scene_version_conflict",
            "message": str(exc),
            "batch_id": exc.batch_id,
            "expected_version": exc.expected_version,
            "current_version": exc.actual_version,
        }
    except RerollPreflightError as exc:
        return {"success": False, "error": exc.error_code, "message": str(exc)}

    if dry_run:
        # Read-only (REC-100): estimate only — never clear primaries or dispatch.
        runner = EpisodeRunner(
            project=project,
            plan={},
            episode=episode_str,
            budget_usd=budget_usd,
            step_runner=None,
            strategy_engine=None,
        )
        return {
            "success": True,
            "dry_run": True,
            "batch": batch,
            "grouping": selector.strategy,
            "estimated_cost_usd": round(runner._estimate_take_cost(beat), 4),
        }

    step_runner = _build_step_runner_for_episode(project, episode)
    runner = EpisodeRunner(
        project=project,
        plan={},
        episode=episode_str,
        budget_usd=budget_usd,
        step_runner=step_runner,
        strategy_engine=None,
    )
    estimated_cost = runner._estimate_take_cost(beat)
    try:
        # Board gate pre-scan BEFORE prepare_beat_for_reroll (which mutates +
        # persists scene state) — a gate-blocked reroll must write nothing.
        _preflight_board_gate(project=project, episode=episode, beats=[beat])
        prep = runner.prepare_beat_for_reroll(
            scene, beat, expected_version=expected_version
        )
        asyncio.run(
            runner.run_scene(
                scene,
                dry_run=False,
                force_new_take=True,
                reroll_beat_id=prep["beat_id"],
                seed=seed,
                make_primary=make_primary,
                strategy_override=strategy,
                reroll_note=None,
                allow_cleared_stale=True,
                expected_version=expected_version,
            )
        )
    except RerollPreflightError as exc:
        return {"success": False, "error": exc.error_code, "message": str(exc)}
    except BudgetExhaustedError as exc:
        return {
            "success": False,
            "error": "budget_exhausted",
            "message": str(exc),
            "beat_id": exc.beat_id,
            "spent_usd": exc.spent,
            "budget_usd": budget_usd,
        }
    except BoardGateError as exc:
        return {
            "success": False,
            "error": "board_gate_blocked",
            "reason": exc.reason,
            "message": str(exc),
            "beat_id": exc.beat_id,
        }
    except SceneVersionConflictError as exc:
        return {
            "success": False,
            "error": "scene_version_conflict",
            "message": str(exc),
            "batch_id": exc.batch_id,
            "expected_version": exc.expected_version,
            "current_version": exc.actual_version,
        }

    new_take = beat.takes[-1] if beat.takes else None
    succeeded = new_take is not None and getattr(new_take, "status", None) == "succeeded"
    take_number = (
        new_take.take_index + 1 if new_take is not None
        else prep["next_take_index"] + 1
    )
    if not succeeded:
        # Fail-closed parity with POST /reroll: a failed/missing take is an
        # explicit error, never a dispatched record.
        return {
            "success": False,
            "error": "dispatch_failed",
            "message": "reroll dispatched but the new take did not succeed",
            "batch": batch,
            "beat_id": prep["beat_id"],
            "take_number": take_number,
            "take_status": getattr(new_take, "status", "missing"),
            "dispatched": [],
        }
    return {
        "success": succeeded,
        "batch": batch,
        "grouping": selector.strategy,
        "dispatched": [
            {
                "beat_id": prep["beat_id"],
                "take_number": take_number,
                "batch_file": path.name,
            }
        ],
        "budget_usd": budget_usd,
        "estimated_cost_usd": round(estimated_cost, 4),
    }


def _run_scene_lock(
    *,
    project: str,
    episode: int,
    batch: str,
    reason: str | None,
    unlock: bool = False,
) -> tuple[int, dict]:
    """Lock or unlock a persisted scene selected by --batch."""

    scene_id, error = _scene_id_from_batch_for_lock(batch, episode)
    if error is not None:
        return EXIT_VALIDATION, error

    episode_str = f"ep_{episode:03d}"
    # REC-231 Phase 4: lock/unlock the ACTIVE version body (lock fields are excluded
    # from structure_fingerprint, so the status writer accepts them).
    try:
        scene, loaded_version = load_scene_active_with_version(
            project, episode_str, scene_id
        )
    except FileNotFoundError:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "batch_scene_missing",
            "message": f"No persisted scene for {batch}.",
            "path": str(active_scene_body_path(project, episode_str, scene_id)),
        }

    if unlock:
        new_locked, new_reason, new_by, new_at = False, None, None, None
        action = "unlock"
    else:
        new_locked, new_reason = True, reason
        new_by, new_at = _current_operator(), utc_now_iso8601()
        action = "lock"

    def _apply_lock(s: Scene) -> None:
        s.locked = new_locked
        s.lock_reason = new_reason
        s.locked_by = new_by
        s.locked_at = new_at

    _apply_lock(scene)  # reflect in the returned in-memory scene
    save_active_scene_status(
        project, episode_str, scene_id,
        expected_version=loaded_version, mutate=_apply_lock,
    )
    return EXIT_OK, {
        "success": True,
        "action": action,
        "batch": batch,
        "scene_id": scene.scene_id,
        "path": str(active_scene_body_path(project, episode_str, scene_id)),
        "locked": scene.locked,
        "lock_reason": scene.lock_reason,
        "locked_by": scene.locked_by,
        "locked_at": scene.locked_at,
    }


# ── Storyboard board CLI (REC-124 Phase 5) ────────────────────────

def _run_storyboard_build(
    *,
    project: str,
    episode: int,
    batch: str,
    dry_run: bool,
    auto_reroll: bool = False,
    max_board_attempts: int = 3,
) -> dict:
    """Build a proposed storyboard strip for one batch selector."""

    try:
        if auto_reroll:
            if dry_run:
                raise BoardBuilderError("--auto-reroll is incompatible with --dry-run")
            step_runner = _build_step_runner_for_episode(project, episode)
            return build_with_auto_reroll(
                project,
                episode,
                batch,
                step_runner=step_runner,
                max_attempts=max_board_attempts,
            )
        step_runner = None if dry_run else _build_step_runner_for_episode(project, episode)
        return build_and_dispatch_board(
            project,
            episode,
            batch,
            step_runner=step_runner,
            dry_run=dry_run,
        )
    except BoardBuilderError as exc:
        return {
            "success": False,
            "error": "board_builder_error",
            "message": str(exc),
        }


def _storyboard_result_exit_code(result: dict) -> int:
    return EXIT_PARTIAL if result.get("success") is False else EXIT_OK


def _run_board_only_rederive(
    project: str,
    episode: int,
    batch: str,
    *,
    dry_run: bool,
) -> int:
    result = _run_storyboard_build(
        project=project,
        episode=episode,
        batch=batch,
        dry_run=dry_run,
    )
    success = result.get("success")
    status = result.get("status") or ("failed" if success is False else "ok")
    print(f"board-only: batch={batch} status={status}")
    if dry_run:
        summary = result.get("note") or result.get("summary") or "dry-run"
        print(f"board-only: dry-run summary: {summary}")
        if "estimated_cost_usd" in result:
            print(
                "board-only: estimated cost "
                f"${float(result['estimated_cost_usd']):.2f}"
            )
    print(_json_line(result))
    return _storyboard_result_exit_code(result)


def _auto_reroll_stop_block(project: str, result: dict) -> str | None:
    """Return the human stop-and-surface block for a stopped auto-reroll."""

    stopped_reason = result.get("stopped_reason")
    if not stopped_reason:
        return None

    verdict = _load_auto_reroll_verdict(project, result)
    route = (
        result.get("stopped_route")
        or ((verdict.get("routing") or {}).get("class") if isinstance(verdict, dict) else None)
        or (result.get("story_gate") or {}).get("route")
        or _last_lineage_route(result)
        or "unknown"
    )
    lines = [
        "StoryGate stop-and-surface",
        f"route: {route}",
        f"stopped_reason: {stopped_reason}",
        "failed checks:",
    ]
    failed = _failed_check_lines(verdict)
    if failed:
        lines.extend(f"- {line}" for line in failed)
    else:
        lines.append("- none recorded")

    history = _attempt_history_lines(result)
    if history:
        lines.append("attempt history:")
        lines.extend(f"- {line}" for line in history)

    if route == "script_problem":
        questions = _script_questions(verdict)
        lines.append("script questions:")
        if questions:
            lines.extend(f"- {question}" for question in questions)
        else:
            lines.append("- none recorded")
        lines.append("playbook: recoil/docs/DIRECTOR_NOTES_PLAYBOOK.md chain B")
    return "\n".join(lines)


def _load_auto_reroll_verdict(project: str, result: dict) -> dict:
    verdict = result.get("verdict") or result.get("_verdict")
    if isinstance(verdict, dict):
        return verdict
    verdict_path = result.get("stopped_verdict_path") or (
        result.get("story_gate") or {}
    ).get("verdict_path")
    if not verdict_path:
        return {}
    path = CoreProjectPaths.for_project(project).project_root / verdict_path
    try:
        loaded = json.loads(path.read_text(encoding="utf-8"))
    except Exception:  # noqa: BLE001 - stop block remains best-effort
        return {}
    return loaded if isinstance(loaded, dict) else {}


def _last_lineage_route(result: dict) -> str | None:
    lineage = result.get("reroll_lineage") or []
    if not isinstance(lineage, list) or not lineage:
        return None
    last = lineage[-1]
    return last.get("route") if isinstance(last, dict) else None


def _attempt_history_lines(result: dict) -> list[str]:
    lineage = result.get("reroll_lineage") or []
    if not isinstance(lineage, list):
        return []
    lines: list[str] = []
    for entry in lineage:
        if not isinstance(entry, dict):
            continue
        lines.append(
            "attempt {attempt}: route={route}, hard={hard}, soft={soft}, artifact={artifact}".format(
                attempt=entry.get("attempt", "?"),
                route=entry.get("route") or "unknown",
                hard=entry.get("hard_fails", "?"),
                soft=entry.get("soft_fails", "?"),
                artifact=entry.get("artifact") or "unknown",
            )
        )
    return lines


def _failed_check_lines(verdict: dict) -> list[str]:
    if not isinstance(verdict, dict):
        return []
    lines: list[str] = []
    text_stageability = verdict.get("text_stageability")
    if isinstance(text_stageability, dict):
        for finding in text_stageability.get("findings", []) or []:
            if not isinstance(finding, dict) or finding.get("passed") is not False:
                continue
            label = f"text beat {finding.get('beat_index', '?')} {finding.get('check', 'check')}"
            lines.append(_format_failed_check(label, finding))

    for panel in verdict.get("panels", []) or []:
        if not isinstance(panel, dict):
            continue
        label = f"panel {panel.get('index', '?')}"
        lines.extend(_failed_forced_check_lines(label, panel))

    for transition in verdict.get("transitions", []) or []:
        if not isinstance(transition, dict):
            continue
        label = f"transition {transition.get('from', '?')}->{transition.get('to', '?')}"
        lines.extend(_failed_forced_check_lines(label, transition))
    return lines


def _failed_forced_check_lines(label: str, item: dict) -> list[str]:
    forced_checks = item.get("forced_checks") or {}
    if not isinstance(forced_checks, dict):
        return []
    lines: list[str] = []
    for check, entry in forced_checks.items():
        if isinstance(entry, dict) and entry.get("passed") is False:
            lines.append(_format_failed_check(f"{label} {check}", entry))
    return lines


def _format_failed_check(label: str, entry: dict) -> str:
    severity = entry.get("severity") or "unknown"
    reason = str(entry.get("reason") or "no reason recorded").strip()
    return f"{label} ({severity}): {reason}"


def _script_questions(verdict: dict) -> list[str]:
    if not isinstance(verdict, dict):
        return []
    text_stageability = verdict.get("text_stageability")
    if not isinstance(text_stageability, dict):
        return []
    questions: list[str] = []
    seen: set[str] = set()
    for finding in text_stageability.get("findings", []) or []:
        if not isinstance(finding, dict) or finding.get("passed") is not False:
            continue
        question = str(finding.get("suggested_script_question") or "").strip()
        if question and question not in seen:
            seen.add(question)
            questions.append(question)
    return questions


def _run_board_decision(
    *,
    project: str,
    episode: int,
    batch: str,
    decision: str,
    reason: str | None = None,
    route: str | None = None,
) -> tuple[int, dict]:
    """Approve or reject a proposed board after freshness/artifact checks."""

    selector = parse_batch_selector(batch)
    if selector is None:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "invalid_batch_selector",
            "message": (
                f"{batch!r} is not a valid selector "
                "(expected EP###_CONT_### or EP###_ONER_###)."
            ),
        }
    if selector.episode != episode:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "invalid_batch_selector",
            "message": (
                f"selector episode EP{selector.episode:03d} does not match "
                f"--episode {episode}."
            ),
        }

    episode_str = f"ep_{episode:03d}"
    # REC-231 Phase 4: read the ACTIVE version body + version atomically; the board
    # decision's status writes below target that version.
    try:
        scene, loaded_version = load_scene_active_with_version(
            project, episode_str, selector.scene_id
        )
    except FileNotFoundError:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "batch_scene_missing",
            "message": f"No persisted scene for {batch}.",
            "path": str(active_scene_body_path(project, episode_str, selector.scene_id)),
        }

    r2v_beats = [
        beat for beat in scene.beats
        if (beat.beat_metadata or {}).get("modality") == "r2v_multi"
    ]
    if len(r2v_beats) != 1:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "new_take_requires_single_r2v_multi_beat",
            "message": "board decision requires exactly one r2v_multi target beat.",
        }
    beat = r2v_beats[0]

    if not beat.board:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "board_missing",
            "message": "no proposed board found; regenerate with --storyboard",
        }

    try:
        current_sha256 = _compute_board_source_sha256(project, batch, beat)
    except ValueError as exc:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "board_source_unavailable",
            "message": str(exc),
        }
    if current_sha256 != beat.board.get("source_sha256"):
        return EXIT_VALIDATION, {
            "success": False,
            "error": "board_stale",
            "message": "board stale (segments changed since generation) — regenerate with --storyboard",
        }

    artifact_error = _validate_board_artifact(project, beat.board)
    if artifact_error:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "board_artifact_invalid",
            "message": artifact_error,
        }
    _require_board_shotset_hash(beat)

    approved_by = "JT"
    decided_at = utc_now_iso8601()

    # REC-231 Phase 4: the approve/reject decision (board status + scene lock — both
    # non-structural) is applied as a closure so it persists onto the ACTIVE version
    # body through the structure-guarded writer, AND mirrored onto the in-memory
    # scene for the downstream label/photoreal/result reads.
    def _apply_decision(s: Scene) -> None:
        b = next(
            bb for bb in s.beats
            if (bb.beat_metadata or {}).get("modality") == "r2v_multi"
        )
        if decision == "approve":
            if not (isinstance(b.board, dict) and b.board.get("status") == "approved"):
                b.approve_board(approved_by=approved_by)
            s.locked = True
            s.lock_reason = f"approved:{approved_by}"
            s.locked_by = approved_by
            s.locked_at = decided_at
        elif decision == "reject":
            b.reject_board(approved_by=approved_by)
        else:
            raise ValueError(f"unknown board decision: {decision}")

    try:
        _apply_decision(scene)
    except ValueError as exc:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "board_decision_invalid",
            "message": str(exc),
        }

    save_active_scene_status(
        project, episode_str, selector.scene_id,
        expected_version=loaded_version,
        mutate=_apply_decision,
        post_write=lambda saved: _stamp_board_ssot(
            project, episode, _single_r2v_board_beat(saved),
            scene_version=loaded_version,
        ),
    )
    label_row = {
        "schema_version": 1,
        "created_at": utc_now_iso8601(),
        "project": project,
        "episode": episode,
        "batch_id": batch,
        "artifact": beat.board.get("artifact"),
        "source_sha256": beat.board.get("source_sha256"),
        "decision": decision,
        "decision_by": approved_by,
        "reason": reason,
        "route_hint": route,
        "judge": beat.board.get("story_gate"),
    }
    try:
        append_label(
            CoreProjectPaths.for_project(project).episode_storyboards_dir(episode),
            label_row,
        )
    except IOError as exc:
        logger.warning("story gate label append failed for %s: %s", batch, exc)

    if decision == "approve":
        project_root = CoreProjectPaths.for_project(project).project_root
        photoreal = beat.board.get("photoreal_artifact") if beat.board else None
        if isinstance(photoreal, str) and photoreal:
            photoreal_path = Path(photoreal).expanduser()
            if not photoreal_path.is_absolute():
                photoreal_path = project_root / photoreal_path
            if photoreal_path.is_file():
                print("photoreal finish: already exists; skipping")
                return EXIT_OK, beat.board
            beat.board.pop("photoreal_artifact", None)

            def _pop_photoreal(s: Scene) -> None:
                b = next(
                    bb for bb in s.beats
                    if (bb.beat_metadata or {}).get("modality") == "r2v_multi"
                )
                if b.board is not None:
                    b.board.pop("photoreal_artifact", None)

            save_active_scene_status(
                project, episode_str, selector.scene_id,
                expected_version=loaded_version, mutate=_pop_photoreal,
            )

        print("photoreal finish: 1 × ~$0.41")
        try:
            finish = render_board_finish(
                project,
                episode,
                batch,
                step_runner=_build_step_runner_for_episode(project, episode),
                expected_version=loaded_version,
            )
        except Exception as exc:  # noqa: BLE001
            return EXIT_PARTIAL, {
                "success": False,
                "error": "board_finish_failed",
                "message": str(exc),
                "hint": "re-run --approve-board to retry the finish",
                "board": beat.board,
            }
        if not finish.get("success"):
            return EXIT_PARTIAL, {
                "success": False,
                "error": "board_finish_failed",
                "message": finish.get("error") or "storyboard finish dispatch failed",
                "hint": "re-run --approve-board to retry the finish",
                "board": beat.board,
            }

        refreshed = load_scene_active(project, episode_str, selector.scene_id)
        refreshed_beats = [
            item for item in refreshed.beats
            if (item.beat_metadata or {}).get("modality") == "r2v_multi"
        ]
        if len(refreshed_beats) != 1:
            raise RuntimeError("approved board target disappeared while persisting finish")
        refreshed_beat = refreshed_beats[0]
        if not refreshed_beat.board:
            raise RuntimeError("approved board disappeared while persisting finish")
        refreshed_beat.board["photoreal_artifact"] = finish["artifact"]
        for key in ("model", "provider", "fallback_from"):
            if key in finish:
                refreshed_beat.board[key] = finish[key]
        _require_board_shotset_hash(refreshed_beat)

        def _apply_finish(s: Scene) -> None:
            b = next(
                bb for bb in s.beats
                if (bb.beat_metadata or {}).get("modality") == "r2v_multi"
            )
            if b.board is not None:
                b.board["photoreal_artifact"] = finish["artifact"]
                for key in ("model", "provider", "fallback_from"):
                    if key in finish:
                        b.board[key] = finish[key]

        save_active_scene_status(
            project, episode_str, selector.scene_id,
            expected_version=loaded_version,
            mutate=_apply_finish,
            post_write=lambda saved: _stamp_board_ssot(
                project, episode, _single_r2v_board_beat(saved),
                scene_version=loaded_version,
            ),
        )
        beat = refreshed_beat
    return EXIT_OK, beat.board


def _run_revalidate_board(
    *,
    project: str,
    episode: int,
    batch: str,
) -> tuple[int, dict]:
    """Re-bless a previously approved board after structural drift, with no spend."""

    selector = parse_batch_selector(batch)
    if selector is None:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "invalid_batch_selector",
            "message": (
                f"{batch!r} is not a valid selector "
                "(expected EP###_CONT_### or EP###_ONER_###)."
            ),
        }
    if selector.episode != episode:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "invalid_batch_selector",
            "message": (
                f"selector episode EP{selector.episode:03d} does not match "
                f"--episode {episode}."
            ),
        }

    episode_str = f"ep_{episode:03d}"
    # REC-231 Phase 4: revalidate reads + re-blesses the ACTIVE version body.
    try:
        scene, loaded_version = load_scene_active_with_version(
            project, episode_str, selector.scene_id
        )
    except FileNotFoundError:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "batch_scene_missing",
            "message": f"No persisted scene for {batch}.",
            "path": str(active_scene_body_path(project, episode_str, selector.scene_id)),
        }

    r2v_beats = [
        beat for beat in scene.beats
        if (beat.beat_metadata or {}).get("modality") == "r2v_multi"
    ]
    if len(r2v_beats) != 1:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "new_take_requires_single_r2v_multi_beat",
            "message": "board revalidation requires exactly one r2v_multi target beat.",
        }
    beat = r2v_beats[0]

    try:
        shotset_hash_val = _require_board_shotset_hash(beat)
    except RuntimeError as exc:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "board_shotset_hash_missing",
            "message": str(exc),
        }

    record = derivation_manifest.get_board(project, episode, shotset_hash_val)
    if record is None:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "no_prior_approval",
            "message": "revalidate requires a prior approved board SSOT record.",
        }
    if record.get("status") != "approved":
        return EXIT_VALIDATION, {
            "success": False,
            "error": "not_previously_approved",
            "message": (
                "revalidate only re-blesses a previously APPROVED board; "
                "rejected/proposed records require --approve-board"
            ),
        }

    beat.board = board_record_to_cache(record)
    try:
        current_sha256 = _compute_board_source_sha256(project, batch, beat)
    except ValueError as exc:
        return EXIT_VALIDATION, {
            "success": False,
            "error": "board_source_unavailable",
            "message": str(exc),
        }

    updated_record = {
        **record,
        "source_sha256": current_sha256,
        "updated_at": utc_now_iso8601(),
        # REC-231: stamp the active version this revalidation blessed, so the spend
        # discriminator (resolve_board_for_spend) version-checks it. A legacy/pre-REC-231
        # record revalidated here would otherwise stay versionless and skip the guard,
        # re-opening the cross-version board spend hole this patch closes.
        "scene_version": loaded_version,
    }
    covered_shot_ids = list(updated_record.get("covered_shot_ids") or [])
    shot_script_spans, content_freshness_sha = _board_script_span_provenance(
        project,
        episode,
        covered_shot_ids,
    )
    updated_record["shot_script_spans"] = shot_script_spans
    updated_record["content_freshness_sha"] = content_freshness_sha
    updated_record.pop("needs_revalidation", None)
    beat.board = board_record_to_cache(updated_record)
    scene.locked = True

    # REC-231 Phase 4: re-bless the ACTIVE version body (board cache + lock are
    # non-structural) through the structure-guarded writer.
    def _apply_revalidate(s: Scene) -> None:
        b = next(
            bb for bb in s.beats
            if (bb.beat_metadata or {}).get("modality") == "r2v_multi"
        )
        b.board = board_record_to_cache(updated_record)
        s.locked = True

    save_active_scene_status(
        project, episode_str, selector.scene_id,
        expected_version=loaded_version,
        mutate=_apply_revalidate,
        post_write=lambda _saved: derivation_manifest.stamp_board(
            project,
            episode,
            shotset_hash_val,
            updated_record,
        ),
    )
    return EXIT_OK, {"revalidated": True, "spend": 0}


def _compute_board_source_sha256(project: str, batch: str, beat) -> str:
    # SSOT: delegate to the builder's own derivation. A locally-built
    # PayloadContext passes plain dicts where the builder rebuilds
    # CanonicalShot (_as_canonical_shot), so segment text differed and the
    # staleness check false-fired on every board (live catch 2026-06-12).
    from recoil.pipeline._lib.board_builder import _primitive_from_beat

    primitive = _primitive_from_beat(project, batch, beat)
    version = (beat.board or {}).get("fingerprint_version", 1)
    return compute_source_sha256(
        list(getattr(primitive, "timing_segments", []) or []),
        version=version,
    )


def _validate_board_artifact(project: str, board: dict) -> str | None:
    artifact = board.get("artifact")
    if not isinstance(artifact, str) or not artifact:
        return "board artifact missing"
    artifact_path = CoreProjectPaths.for_project(project).project_root / artifact
    if not artifact_path.is_file():
        return f"missing board artifact: {artifact_path}"
    size = artifact_path.stat().st_size
    if size < 1024:
        return f"board artifact too small ({size}B < 1KB): {artifact_path}"
    return None


# ── StoryGate calibration CLI (REC-135 Phase 7) ───────────────────

def _default_story_gate_labels_path() -> Path:
    return (
        _RECOIL_ROOT
        / "config"
        / "story_gate_calibration"
        / "tartarus_ep001_seed_labels.json"
    )


def _run_story_gate_eval(
    *,
    labels: Path,
    samples: int,
    tiers: list[str] | None,
    no_crops_compare: bool,
) -> tuple[int, dict, str]:
    report = run_calibration(
        labels,
        resolve_projects_root(),
        samples=samples,
        tiers=tuple(tiers or ("claude-opus-4-8", "claude-sonnet-4-6")),
        crops_modes=(True,) if no_crops_compare else (True, False),
    )
    aggregate = report.get("aggregate") or {}
    ok = (
        aggregate.get("named_recall_or") == 1.0
        and aggregate.get("schema_valid_rate") == 1.0
        and aggregate.get("judge_unavailable_count") == 0
    )
    return (
        EXIT_OK if ok else EXIT_STORY_GATE_EVAL,
        report,
        _story_gate_eval_table(report),
    )


def _story_gate_eval_table(report: dict) -> str:
    lines = ["", "StoryGate calibration", "case tier crops samples"]
    for entry in report.get("per_case", []):
        vector = entry.get("catch_vector") or []
        grid = "".join("✓" if bool(value) else "✗" for value in vector)
        if entry.get("skipped"):
            grid = f"SKIP: {entry.get('skip_reason')}"
        text = entry.get("text_stageability")
        if isinstance(text, dict):
            text_grid = "".join(
                "✓" if bool(value) else "✗"
                for value in (text.get("catch_vector") or [])
            )
            if text_grid:
                grid = f"{grid} text={text_grid}"
        lines.append(
            "{case_index} {tier} crops={crops} {grid}".format(
                case_index=entry.get("case_index"),
                tier=entry.get("tier"),
                crops=entry.get("crops_mode"),
                grid=grid,
            )
        )
    return "\n".join(lines)


# ── Signal handling ───────────────────────────────────────────────

def install_signal_handlers(loop) -> None:
    """Install SIGTERM and SIGINT handlers that request cancellation on the loop.

    `loop` must expose `request_cancel()`. Currently unused in the
    EpisodeRunner-based live path—asyncio's task cancellation propagates
    naturally through `asyncio.run()`—but retained for callers that wrap
    a cancellation handle.
    """

    def _on_signal(signum, frame):
        loop.request_cancel()
        logger.warning(
            "Cancellation requested (signal %s). Finishing current pass...",
            signum,
        )

    signal.signal(signal.SIGTERM, _on_signal)
    signal.signal(signal.SIGINT, _on_signal)


# ── Pass dict → CoveragePass converter ────────────────────────────

# ── Core generation function ──────────────────────────────────────

def run_generation(
    project: str,
    episode: int,
    pass_ids: list[str] | None = None,
    grouping: str = "auto",
    dry_run: bool = False,
    validate_only: bool = False,
    retry: bool = False,
    force_retry: bool = False,
    budget_usd: float = 25.0,
    force_new_take: bool = False,
    seed: int | None = None,
    make_primary: bool = False,
    batch: str | None = None,
    strategy: str | None = None,
    enforce_staleness_guard: bool = True,
) -> dict:
    """Run video generation for a project/episode via coverage passes.

    Args:
        project: Project name (must exist under projects/).
        episode: Episode number (1-based).
        pass_ids: List of pass IDs to run. None = all passes. [] = ValueError.
        dry_run: If True, validate + estimate cost without generating. No lockfile.
        validate_only: If True, run validation only and return results. No lockfile.
        retry: If True, check PassStore status before running; abort if missing/completed/in-flight.
        force_retry: If True, implies retry but permits a missing local PassStore record.
        budget_usd: Budget cap for live runs.
        enforce_staleness_guard: If True (default), hard-refuse before dispatch when
            the locked coverage_passes were built against a stale plan structural_sha
            (D3 consumer guard). The CLI's --no-staleness-guard sets this False.

    Returns:
        Dict with 'success' key and additional context. JSON-serializable.
    """
    requested_grouping = grouping or "auto"
    resolved_grouping = _resolve_grouping(requested_grouping, pass_ids)
    coverage_scope = resolved_grouping == "coverage" and pass_ids is not None
    effective_retry = retry or force_retry
    logger.info(
        "generation grouping requested=%s resolved=%s scope=%s",
        requested_grouping,
        resolved_grouping,
        "selected_passes" if pass_ids is not None else "all",
    )

    if force_new_take and not batch and (not pass_ids or len(pass_ids) != 1):
        return {
            "success": False,
            "error": "new_take_requires_single_pass",
            "message": "--new-take requires exactly one pass via --pass or a --batch selector.",
        }
    if batch and not force_new_take:
        return {
            "success": False,
            "error": "flag_requires_new_take",
            "message": "--batch requires --new-take.",
        }
    if effective_retry and batch:
        return {
            "success": False,
            "error": "reroll_non_coverage_deferred",
            "message": "--retry is incompatible with --batch; retry semantics "
                       "for batch rerolls are deferred to REC-111 follow-up.",
        }
    # Deferral stays for --retry and non-batch non-coverage rerolls; the --batch
    # continuity/oner reroll path (REC-111) is now legal and handled below.
    if (effective_retry or (force_new_take and not batch)) and (
        not coverage_scope or requested_grouping not in ("auto", "coverage")
    ):
        return {
            "success": False,
            "error": "reroll_non_coverage_deferred",
            "grouping": resolved_grouping,
            "message": (
                "--retry and --new-take currently require "
                "--grouping auto|coverage with --pass <id> (or a --batch "
                "selector for --new-take); general non-coverage reroll is "
                "deferred to REC-111."
            ),
        }
    if (seed is not None or make_primary or strategy is not None) and not force_new_take:
        return {
            "success": False,
            "error": "flag_requires_new_take",
            "message": "--seed, --make-primary, and --strategy require --new-take.",
        }
    if strategy is not None:
        registered_strategies = {
            name for (_m, _modality, name) in AUTHOR_STRATEGIES
            if _modality == "r2v_multi"
        }
        if strategy not in registered_strategies:
            return {
                "success": False,
                "error": "unknown_author_strategy",
                "message": (
                    f"--strategy {strategy!r} is not a registered author strategy "
                    f"({sorted(registered_strategies)})."
                ),
            }
    if batch:
        return _run_batch_reroll(
            project=project,
            episode=episode,
            batch=batch,
            strategy=strategy,
            seed=seed,
            make_primary=make_primary,
            budget_usd=budget_usd,
            dry_run=dry_run,
        )

    # 0. validate_only: run validation and return immediately — no generation
    if validate_only:
        if requested_grouping not in ("auto", "coverage"):
            return {
                "success": False,
                "error": "validate_requires_coverage_grouping",
                "grouping": requested_grouping,
                "message": "--validate validates coverage plans only.",
            }
        paths = ProjectPaths.for_episode(project, episode)
        try:
            loaded_passes_dicts = _load_coverage_pass_dicts(paths, episode)
        except FileNotFoundError as exc:
            return _coverage_passes_file_error(paths, episode, exc)
        except (json.JSONDecodeError, OSError) as exc:
            return _coverage_passes_file_error(paths, episode, exc)

        try:
            coverage_pass_objects = [CoveragePass.from_dict(d) for d in loaded_passes_dicts]
        except Exception as exc:
            return {"success": False, "error": "passes_parse_error", "message": str(exc)}

        validation_results = validate_all_passes(coverage_pass_objects)
        blocks = [
            {"pass_id": r.pass_id, "check": r.check, "message": r.message}
            for r in validation_results if r.severity == Severity.BLOCK
        ]
        warnings = [
            {"pass_id": r.pass_id, "check": r.check, "message": r.message}
            for r in validation_results if r.severity == Severity.WARN
        ]
        for b in blocks:
            logger.error("BLOCK [%s] %s: %s", b["pass_id"], b["check"], b["message"])
        for w in warnings:
            logger.warning("WARN [%s] %s: %s", w["pass_id"], w["check"], w["message"])
        no_blocks = len(blocks) == 0
        return {
            "success": no_blocks,
            "validation_only": True,
            "grouping": "coverage",
            "blocks": blocks,
            "warnings": warnings,
        }

    # 0b. retry: check PassStore status before proceeding
    if effective_retry:
        if not pass_ids or len(pass_ids) != 1:
            return {
                "success": False,
                "error": "retry_requires_single_pass",
                "message": "--retry requires exactly one pass ID via --pass.",
            }
        store = PassStore(project)
        record = store.get_pass(pass_ids[0])
        if record is None:
            if not force_retry:
                return {
                    "success": False,
                    "error": "retry_record_missing",
                    "pass_id": pass_ids[0],
                    "message": (
                        f"no PassStore record for {pass_ids[0]} on this machine — "
                        "the peer's write may be unsynced or in a Dropbox "
                        "conflicted copy; verify sync, or re-run with "
                        "--force-retry to proceed"
                    ),
                }
        else:
            status = record.get("status", "")
            if status == "completed":
                return {
                    "success": False,
                    "error": "already_completed",
                    "pass_id": pass_ids[0],
                }
            elif status == "generating":
                return {
                    "success": False,
                    "error": "orphaned_in_flight",
                    "pass_id": pass_ids[0],
                    "message": (
                        "Pass is in 'generating' status — either another instance is running "
                        "(check lockfile) or a prior run crashed. Delete the PassStore record "
                        "or the lockfile to retry."
                    ),
                }
        # status is None, "failed", or "pending" → proceed with normal run

    paths = ProjectPaths.for_episode(project, episode)
    needs_coverage_pass_file = resolved_grouping == "coverage" or pass_ids is not None
    loaded_passes_dicts: list[dict] = []
    coverage_pass_objects: list[CoveragePass] = []
    selected_dicts: list[dict] = []
    selected_pass_objects: list[CoveragePass] = []

    if needs_coverage_pass_file:
        try:
            loaded_passes_dicts = _load_coverage_pass_dicts(paths, episode)
        except FileNotFoundError as exc:
            return _coverage_passes_file_error(paths, episode, exc)
        except (json.JSONDecodeError, OSError) as exc:
            return _coverage_passes_file_error(paths, episode, exc)

        # SCOPE NOTE (REC-72 D0b / Phase 5): the prose_verify BLOCK gate added to
        # coverage_validator.validate_pass flows through validate_all_passes for
        # resolved coverage runs. Non-coverage selected-pass runs only use the
        # pass file to derive a shot subset, then reassemble with the requested
        # strategy.
        try:
            coverage_pass_objects = [
                CoveragePass.from_dict(d) for d in loaded_passes_dicts
            ]
        except Exception as exc:
            return {"success": False, "error": "passes_parse_error", "message": str(exc)}

        if pass_ids is not None:
            if pass_ids == []:
                raise ValueError("Empty pass_ids list; pass None for all.")
            available_ids = {d.get("pass_id") for d in loaded_passes_dicts}
            unmatched = [pid for pid in pass_ids if pid not in available_ids]
            if unmatched:
                return {
                    "success": False,
                    "error": "pass_filter_unmatched",
                    "unmatched": unmatched,
                    "grouping": resolved_grouping,
                }
            selected_dicts = [
                d for d in loaded_passes_dicts if d.get("pass_id") in pass_ids
            ]
            selected_ids = set(pass_ids)
            selected_pass_objects = [
                p for p in coverage_pass_objects if p.pass_id in selected_ids
            ]
        else:
            selected_dicts = loaded_passes_dicts
            selected_pass_objects = coverage_pass_objects

    if force_new_take and (
        len(selected_dicts) != 1 or len(selected_pass_objects) != 1
    ):
        return {
            "success": False,
            "error": "new_take_requires_single_pass",
            "message": "--new-take requires exactly one pass via --pass.",
        }

    validation_results = (
        validate_all_passes(
            selected_pass_objects if force_new_take else coverage_pass_objects
        )
        if resolved_grouping == "coverage"
        else []
    )
    blocks = [
        {"pass_id": r.pass_id, "check": r.check, "message": r.message}
        for r in validation_results if r.severity == Severity.BLOCK
    ]
    warnings = [
        {"pass_id": r.pass_id, "check": r.check, "message": r.message}
        for r in validation_results if r.severity == Severity.WARN
    ]

    for b in blocks:
        logger.error("BLOCK [%s] %s: %s", b["pass_id"], b["check"], b["message"])
    for w in warnings:
        logger.warning("WARN [%s] %s: %s", w["pass_id"], w["check"], w["message"])

    if blocks:
        return {
            "success": False,
            "error": "validation_blocked",
            "grouping": resolved_grouping,
            "blocks": blocks,
            "warnings": warnings,
        }

    # 5. Cost estimate (used by both dry-run return + live-run summary)
    try:
        cost_per_second = get_provider_cost_per_second("seeddance-2.0")
    except KeyError:
        cost_per_second = 0.0

    if cost_per_second == 0.0:
        logger.warning(
            "cost_per_second missing from seeddance-2.0 profile; "
            "estimate will be $0.00"
        )

    estimated_cost = 0.0
    if selected_dicts:
        for d in selected_dicts:
            duration_s = d.get("duration_s", 0) or 0
            takes = d.get("takes_count") or 1
            estimated_cost += duration_s * takes * cost_per_second
    else:
        episode_str = f"ep_{episode:03d}"
        try:
            estimate_plan = _load_selected_canonical_plan(
                paths=paths,
                episode_str=episode_str,
                pass_ids=None,
                selected_dicts=[],
            )
            estimated_cost = sum(
                float(shot.duration_s or 0.0) for shot in estimate_plan.shots
            ) * cost_per_second
        except (FileNotFoundError, ValueError, OSError):
            estimated_cost = 0.0

    if dry_run:
        if force_new_take:
            episode_str = f"ep_{episode:03d}"
            try:
                canonical_plan = _load_selected_canonical_plan(
                    paths=paths,
                    episode_str=episode_str,
                    pass_ids=pass_ids,
                    selected_dicts=selected_dicts,
                    force_new_take=force_new_take,
                )
                runner = EpisodeRunner(
                    project=project,
                    plan=canonical_plan.raw,
                    episode=episode_str,
                    budget_usd=budget_usd,
                    step_runner=SimpleNamespace(video_dir=paths.video_dir),
                    strategy_engine=None,
                )
                asyncio.run(
                    runner.run_episode_batches(
                        canonical_plan,
                        dry_run=True,
                        grouping=resolved_grouping,
                        selected_coverage_passes=(
                            selected_pass_objects
                            if resolved_grouping == "coverage" else []
                        ),
                        force_new_take=True,
                        seed=seed,
                        make_primary=make_primary,
                    )
                )
            except RerollPreflightError as exc:
                return {
                    "success": False,
                    "error": exc.error_code,
                    "message": str(exc),
                }
        return {
            "success": True,
            "dry_run": True,
            "grouping": resolved_grouping,
            "pass_count": len(selected_dicts),
            "estimated_cost_usd": round(estimated_cost, 4),
            "validation": {"warnings": warnings},
        }

    # 5b. D3 consumer staleness guard (REC-164 Phase 4): before any live dispatch,
    # hard-refuse coverage passes that were locked against a stale plan
    # structural_sha. Coverage-pass consumption only; REFUSES, never rebuilds.
    # --no-staleness-guard sets enforce_staleness_guard=False to bypass.
    if enforce_staleness_guard and needs_coverage_pass_file:
        staleness_error = _coverage_passes_staleness_error(paths, project, episode)
        if staleness_error is not None:
            return staleness_error

    # 6. Live run — acquire lockfile, construct EpisodeRunner, dispatch.
    lock_path = None
    lock_parent = None
    lock_parent_created = False
    completed_live_dispatch = False
    try:
        try:
            if force_new_take:
                lock_parent = CoreProjectPaths.from_root(paths.project_root).passes_dir
                lock_parent_created = not lock_parent.exists()
            lock_path = acquire_episode_lock(paths.project_root, episode)
        except RuntimeError as exc:
            return {"success": False, "error": "locked", "message": str(exc)}

        episode_str = f"ep_{episode:03d}"
        try:
            canonical_plan = _load_selected_canonical_plan(
                paths=paths,
                episode_str=episode_str,
                pass_ids=pass_ids if pass_ids is not None else None,
                selected_dicts=selected_dicts,
                force_new_take=force_new_take,
            )
        except RerollPreflightError as exc:
            return {
                "success": False,
                "error": exc.error_code,
                "message": str(exc),
            }

        store = ExecutionStore(project, migrate=not force_new_take)
        if not force_new_take:
            paths.video_dir.mkdir(parents=True, exist_ok=True)
        step_runner = StepRunner(store=store, paths=paths, episode=episode)
        if force_new_take:
            strategy_engine = None
        else:
            # REC-20: activate intelligent retry on this autonomous pass loop too.
            # Cold-start LearningEngine -> StrategyEngine falls through to static
            # ESCALATION_CHAINS, so this is strictly additive vs blind round-robin.
            strategy_engine = StrategyEngine(
                learning=LearningEngine(project=project),
                model="seeddance-2.0",
            )
        runner = EpisodeRunner(
            project=project,
            plan=canonical_plan.raw,
            episode=episode_str,
            budget_usd=budget_usd,
            step_runner=step_runner,
            strategy_engine=strategy_engine,
        )

        try:
            scenes = asyncio.run(runner.run_episode_batches(
                canonical_plan, dry_run=False,
                grouping=resolved_grouping,
                selected_coverage_passes=(
                    selected_pass_objects if resolved_grouping == "coverage" else []
                ),
                force_new_take=force_new_take,
                seed=seed,
                make_primary=make_primary,
                strategy_override=strategy,
            ))
            completed_live_dispatch = (
                any(
                    b.takes
                    and getattr(b.takes[-1], "take_metadata", {}).get("force_new_take")
                    for s in scenes
                    for b in s.beats
                )
                if force_new_take else True
            )
        except RerollPreflightError as exc:
            return {
                "success": False,
                "error": exc.error_code,
                "message": str(exc),
            }
        except SceneContentDriftError as exc:
            # REC-235: a same-topology shot.raw/script content change on a live run —
            # block before any paid dispatch; the operator must capture it via versioning.
            return {
                "success": False,
                "error": "scene_content_drift",
                "message": str(exc),
                "beat_id": exc.beat_id,
                "batch_id": exc.batch_id,
            }
        except SceneVersionConflictError as exc:
            return {
                "success": False,
                "error": "scene_version_conflict",
                "message": str(exc),
                "batch_id": exc.batch_id,
                "expected_version": exc.expected_version,
                "current_version": exc.actual_version,
            }
        except BudgetExhaustedError as exc:
            # A budget halt is an expected terminal state, not a crash —
            # emit the structured summary instead of a raw traceback
            # (REC-122: run 2 died with no summary JSON). Scene state was
            # already saved by run_episode_batches before the re-raise.
            return {
                "success": False,
                "error": "budget_exhausted",
                "message": str(exc),
                "beat_id": exc.beat_id,
                "spent_usd": exc.spent,
                "reserved_usd": exc.reserved,
                "budget_usd": budget_usd,
                "grouping": resolved_grouping,
            }
        finally:
            release_episode_lock(lock_path)
            if force_new_take and not completed_live_dispatch:
                _remove_transient_empty_lock_parent(
                    lock_parent,
                    created=lock_parent_created,
                )
            lock_path = None

        # Translate the scenes list into the existing CLI return shape.
        if force_new_take:
            result_takes = [
                b.takes[-1]
                for s in scenes
                for b in s.beats
                if b.takes
                and getattr(b.takes[-1], "take_metadata", {}).get("force_new_take")
            ]
        else:
            result_takes = [t for s in scenes for b in s.beats for t in b.takes]
        shots_succeeded = sum(
            1 for t in result_takes if getattr(t, "status", None) == "succeeded"
        )
        shots_failed = sum(
            1 for t in result_takes if getattr(t, "status", None) == "failed"
        )
        success = shots_failed == 0
        if force_new_take:
            success = bool(result_takes) and success
        # REC-231: batches that staged a not_derived candidate (coverage-refresh /
        # topology-drift) dispatched NOTHING and require an operator re-board + rederive
        # --conform. They are absent from `scenes`, so without this a no-dispatch staging
        # run would report success=true/shots=0, masking the required action.
        staged_candidates = dict(
            getattr(runner, "_staged_candidates_without_dispatch", {}) or {}
        )
        if staged_candidates:
            success = False
        result = {
            "success": success,
            "shots_succeeded": shots_succeeded,
            "shots_failed": shots_failed,
            "scenes_completed": len(scenes),
            "grouping": resolved_grouping,
            "budget_usd": budget_usd,
            "estimated_cost_usd": round(estimated_cost, 4),
        }
        if staged_candidates:
            result["status"] = "candidates_staged"
            result["staged_candidates"] = staged_candidates
            result["requires_conform"] = True
            result["message"] = (
                f"{len(staged_candidates)} batch(es) staged a not_derived candidate "
                "without dispatch; re-board then `rederive --conform` to activate."
            )
        return result

    finally:
        # Belt-and-suspenders for the lock — if anything escaped before the
        # inner try/finally cleaned up.
        if lock_path is not None:
            release_episode_lock(lock_path)
        if force_new_take and not completed_live_dispatch:
            _remove_transient_empty_lock_parent(
                lock_parent,
                created=lock_parent_created,
            )


# ── Edit pass (krea2-flora Phase 4) ───────────────────────────────

def run_edit_pass(
    project: str,
    episode: int,
    shot_id: str,
    source_frame: str,
    model: str = "seedream-v4.5",
    prompt: str = "",
    aspect_ratio: str = "9:16",
) -> dict:
    """Re-render an EXISTING frame through a Flora i2i/is2i image model.

    krea2-flora Phase 4 — reference-conditioned edit pass. This is the ONE
    sanctioned, manual/on-demand CLI invocation that serves BOTH drift-repair
    and previz-restyle on the same machinery: it routes through the real
    pipeline (``StepRunner.execute_keyframe``) — NOT a throwaway parallel path.

    The ``source_frame`` (an existing rendered frame) flows into
    ``execute_keyframe(source_frame=...)`` → ``UnifiedVideoPayload.image`` (the
    i2i/is2i base/start frame). The Flora adapter's ``_infer_action`` then
    promotes the image action t2i → i2i/is2i and emits the image ref field.
    The project's bound Look/Identity refs are resolved + injected by
    ``execute_keyframe`` itself (the Phase 2 LookBundle path), so a project with
    a ``look`` binding re-renders the frame conditioned on that Look/Identity.

    Args:
        project: Project name (directory under projects/).
        episode: Episode number (1-based) — scopes the ProjectPaths so the
            bound Look/Identity resolves from project_config.json.
        shot_id: Shot identifier the re-rendered frame belongs to.
        source_frame: Path to the EXISTING frame to re-render (the i2i base).
        model: Flora image model id (default: seedream-v4.5, which registers
            is2i). Promotion to i2i/is2i is driven by the presence of the base
            frame, not this flag.
        prompt: Optional edit/restyle prompt. The bound Look's textual half is
            applied on top via execute_keyframe's apply_look hook.
        aspect_ratio: Aspect ratio (default 9:16). The bound Look's
            aspect_default wins when a Look is bound.

    Returns:
        Dict with 'success' key and context. JSON-serializable.

    SPEND POLICY: the actual paid generation is DEFERRED in this build. This
    function constructs the sanctioned StepRunner.execute_keyframe call shape;
    it does not perform a paid /generate. The execute_keyframe path itself is
    structurally wired but the supervised Flora image probe is gated out.
    """
    src = Path(source_frame).expanduser()
    if not src.is_file():
        return {
            "success": False,
            "error": "source_frame_missing",
            "path": str(src),
        }

    # Mirror run_generation's StepRunner wiring exactly: episode-scoped
    # ProjectPaths (so the bound Look/Identity resolves from
    # project_root/project_config.json) + ExecutionStore(project).
    paths = ProjectPaths.for_episode(project, episode)
    store = ExecutionStore(project)
    paths.frames_dir.mkdir(parents=True, exist_ok=True)
    step_runner = StepRunner(store=store, paths=paths, episode=episode)
    step_runner._dispatch_path = "edit_pass_cli"

    # SPEND-GATE: the paid Flora image /generate is DEFERRED in this build.
    # We construct + return the sanctioned execute_keyframe call shape (the
    # same machinery the live edit pass will use) without dispatching a paid
    # generation. Flip RECOIL_EDIT_PASS_LIVE=1 in a supervised session to run
    # the real generation once the Flora image ref-field probe is confirmed.
    call_shape = {
        "method": "StepRunner.execute_keyframe",
        "shot_id": shot_id,
        "model": model,
        "source_frame": str(src),
        "aspect_ratio": aspect_ratio,
        "prompt_provided": bool(prompt),
    }

    if os.environ.get("RECOIL_EDIT_PASS_LIVE") != "1":
        return {
            "success": True,
            "edit_pass": True,
            "deferred": True,
            "reason": "spend_gate",
            "message": (
                "Edit pass wired through StepRunner.execute_keyframe "
                "(source_frame → UnifiedVideoPayload.image → Flora i2i/is2i). "
                "Paid generation DEFERRED per spend policy. "
                "Set RECOIL_EDIT_PASS_LIVE=1 to run live."
            ),
            "call_shape": call_shape,
        }

    # LIVE path (supervised only) — routes through the real pipeline.
    result = step_runner.execute_keyframe(
        shot_id=shot_id,
        prompt=prompt,
        model=model,
        aspect_ratio=aspect_ratio,
        source_frame=src,
    )
    return {
        "success": bool(getattr(result, "success", False)),
        "edit_pass": True,
        "deferred": False,
        "shot_id": shot_id,
        "model": model,
        "output_path": getattr(result, "output_path", None),
        "cost_usd": getattr(result, "cost_usd", 0.0),
        "final_state": getattr(result, "final_state", ""),
        "error": getattr(result, "error", None),
    }


# ── Concept / look-dev (krea2-flora Phase 5) ──────────────────────

# Krea's creativity dial — the valid rungs per the krea-2 model_profiles
# `params.creativity` enum ("raw|low|medium|high"). The concept CLI validates
# against this so a typo fails loud at arg-parse rather than at the paid call.
_KREA_CREATIVITY_LEVELS = ("raw", "low", "medium", "high")


def run_concept(
    prompt: str,
    creativity: str = "medium",
    aspect_ratio: str = "9:16",
    seed: int | None = None,
    look_id: str | None = None,
    out: str | None = None,
    model: str = "krea-2",
) -> dict:
    """Look-dev / concepting gen through Krea 2 (`t2i-krea-2-t2i`) in Flora.

    krea2-flora Phase 5 — the look-dev/concepting entry point. Krea 2 is NOT a
    hero render model: it is a pure text→image *concepting* tool whose outputs
    become Look ``style_refs``. This is an ISOLATED, OPTIONAL surface — if it
    misbehaves, parking it must not touch the production primitive. It exposes
    the Krea ``creativity`` dial (``raw|low|medium|high``), which no other model
    honors.

    The generated concept image is intended to be saved as a Look ``style_ref``:
    when ``--look`` is provided we resolve that Look from the Phase-1 registry so
    the operator knows exactly which Look the concept feeds, and ``--out``
    targets the on-disk ``style_ref`` location (under
    ``recoil/config/looks/<look_id>/``) the human will register the image at.

    Args:
        prompt: Concepting / look-dev prompt (the text→image instruction).
        creativity: Krea creativity dial — one of ``raw|low|medium|high``
            (default ``medium``, mirroring the noir_neon Look's authored value).
        aspect_ratio: Aspect ratio (default 9:16). Krea supports 1:1/9:16/16:9.
        seed: Optional integer seed for reproducible concepting rerolls.
        look_id: Optional Look id this concept feeds — resolved from the
            Phase-1 registry so the result records the intended style_ref owner.
        out: Optional output path the concept image saves to (the style_ref
            location the human registers it at).
        model: Flora concepting model id (default ``krea-2`` →
            ``t2i-krea-2-t2i``). Pinned to the Krea concepting profile.

    Returns:
        Dict with 'success' key and context. JSON-serializable.

    SPEND POLICY: the actual paid Krea generation is DEFERRED in this build.
    This function constructs the sanctioned concept call shape (the same dial +
    routing the live concepting gen will use) and returns a ``deferred`` result
    WITHOUT performing any paid call. Flip ``RECOIL_CONCEPT_LIVE=1`` in a
    supervised session to run the real Krea concepting gen.
    """
    if creativity not in _KREA_CREATIVITY_LEVELS:
        return {
            "success": False,
            "error": "invalid_creativity",
            "message": (
                f"creativity={creativity!r} not one of "
                f"{list(_KREA_CREATIVITY_LEVELS)} (Krea creativity dial)."
            ),
        }

    # Resolve the target Look from the Phase-1 registry (look-dev context): the
    # concept output is destined to become one of this Look's style_refs. A bad
    # look_id should surface here, NOT after a paid gen. A None look_id is fine
    # (freeform concepting not yet bound to a Look).
    resolved_look_id: str | None = None
    if look_id:
        try:
            from recoil.pipeline._lib.look_loader import load_registries
            looks, _ = load_registries()
        except Exception as exc:  # registry validation / load failure
            return {
                "success": False,
                "error": "look_registry_error",
                "message": str(exc),
            }
        if look_id not in looks:
            return {
                "success": False,
                "error": "look_not_found",
                "look_id": look_id,
                "message": (
                    f"look_id={look_id!r} not in the Look registry "
                    f"(known: {sorted(looks)})."
                ),
            }
        resolved_look_id = look_id

    # SPEND-GATE: the paid Krea concepting /generate is DEFERRED in this build.
    # We construct + return the sanctioned concept call shape (the same dial +
    # routing the live concepting gen will use) without dispatching a paid
    # generation. Flip RECOIL_CONCEPT_LIVE=1 in a supervised session to run the
    # real Krea concepting gen once cost-per-image is confirmed.
    call_shape = {
        "method": "StepRunner.execute_keyframe",
        "model": model,
        "flora_model_id": "t2i-krea-2-t2i",
        "modality": "image_t2i",
        "creativity": creativity,
        "aspect_ratio": aspect_ratio,
        "seed": seed,
        "prompt_provided": bool(prompt),
        "style_ref_target": out,
        "look_id": resolved_look_id,
    }

    if os.environ.get("RECOIL_CONCEPT_LIVE") != "1":
        return {
            "success": True,
            "concept": True,
            "deferred": True,
            "reason": "spend_gate",
            "message": (
                "Concept/look-dev gen wired through Krea 2 (t2i-krea-2-t2i) "
                "with the creativity dial; output is saveable as a Look "
                "style_ref. Paid generation DEFERRED per spend policy. "
                "Set RECOIL_CONCEPT_LIVE=1 to run live."
            ),
            "call_shape": call_shape,
        }

    # LIVE path (supervised only) — routes through the real pipeline. Krea 2 is
    # a t2i concepting model: no source frame, no refs (max_reference_images=0).
    # We wire it through StepRunner.execute_keyframe (the sanctioned image path),
    # passing the creativity dial via inputs_snapshot so the Flora adapter can
    # emit it. The output saves at `out` (the style_ref location) when provided.
    paths = ProjectPaths.for_episode("_concept", 0)
    store = ExecutionStore("_concept")
    paths.frames_dir.mkdir(parents=True, exist_ok=True)
    step_runner = StepRunner(store=store, paths=paths, episode=0)
    step_runner._dispatch_path = "concept_cli"
    result = step_runner.execute_keyframe(
        shot_id=f"concept_{resolved_look_id or 'freeform'}",
        prompt=prompt,
        model=model,
        aspect_ratio=aspect_ratio,
        inputs_snapshot={"creativity": creativity, "seed": seed},
    )
    output_path = getattr(result, "output_path", None)
    return {
        "success": bool(getattr(result, "success", False)),
        "concept": True,
        "deferred": False,
        "model": model,
        "creativity": creativity,
        "look_id": resolved_look_id,
        "output_path": output_path,
        "style_ref_target": out,
        "cost_usd": getattr(result, "cost_usd", 0.0),
        "final_state": getattr(result, "final_state", ""),
        "error": getattr(result, "error", None),
    }


# ── CLI entry point ───────────────────────────────────────────────

def _run_currency_check_cli(argv: list[str]) -> int:
    """Read-only derivation freshness report for an episode."""
    parser = argparse.ArgumentParser(
        prog="generate.py currency-check",
        description=(
            "Check manifest derivation currency for plan, coverage passes, "
            "scenes, and approved board records. Read-only; no generation."
        ),
    )
    parser.add_argument(
        "--project",
        required=True,
        help="Project name (directory under projects/)",
    )
    parser.add_argument(
        "--episode",
        required=True,
        type=int,
        help="Episode number (1-based integer)",
    )
    args = parser.parse_args(argv)

    stale = False
    for stage in ("plan", "coverage_passes", "scenes"):
        fresh, broken = derivation_manifest.freshness(
            args.project,
            args.episode,
            stage,
        )
        if fresh:
            print(f"FRESH: {stage}")
        else:
            stale = True
            print(f"STALE: {stage} (broken at {broken or stage})")

    boards = derivation_manifest.board_freshness(args.project, args.episode)
    if not boards:
        print("board: none")
    for shotset_hash_val, fresh, broken in boards:
        label = f"board[{shotset_hash_val}]"
        if fresh:
            print(f"FRESH: {label}")
        else:
            stale = True
            print(f"STALE: {label} (broken at {broken or 'board'})")

    return EXIT_PARTIAL if stale else EXIT_OK


def _run_concept_cli(argv: list[str]) -> int:
    """argparse + dispatch for the `concept` subcommand (krea2-flora Phase 5).

    Look-dev / concepting through Krea 2 (``t2i-krea-2-t2i``) exposing the Krea
    ``creativity`` dial. The output is intended to become a Look ``style_ref``.
    This is an ISOLATED, OPTIONAL surface — it does NOT touch the production
    primitive. Mirrors the edit-pass subcommand's gating: the paid Krea gen is
    DEFERRED behind ``RECOIL_CONCEPT_LIVE=1``.
    """
    parser = argparse.ArgumentParser(
        prog="generate.py concept",
        description=(
            "krea2-flora look-dev / concepting — Krea 2 (t2i-krea-2-t2i) "
            "text→image with the creativity dial. Outputs become Look "
            "style_refs (NOT a hero render; isolated + optional)."
        ),
    )
    parser.add_argument(
        "--prompt", required=True,
        help="Concepting / look-dev prompt (the text→image instruction)",
    )
    parser.add_argument(
        "--creativity", default="medium", choices=_KREA_CREATIVITY_LEVELS,
        help="Krea creativity dial (default: medium)",
    )
    parser.add_argument(
        "--aspect", dest="aspect_ratio", default="9:16",
        help="Aspect ratio (default 9:16; Krea supports 1:1/9:16/16:9)",
    )
    parser.add_argument(
        "--seed", type=int, default=None,
        help="Optional integer seed for reproducible concepting rerolls",
    )
    parser.add_argument(
        "--look", dest="look_id", default=None, metavar="LOOK_ID",
        help="Optional Look id this concept feeds (resolved from the registry)",
    )
    parser.add_argument(
        "--out", default=None, metavar="STYLE_REF_PATH",
        help="Optional output path (the style_ref location to save the concept)",
    )
    parser.add_argument(
        "--model", default="krea-2",
        help="Flora concepting model id (default: krea-2 → t2i-krea-2-t2i)",
    )
    args = parser.parse_args(argv)

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(name)s %(levelname)s %(message)s",
        stream=sys.stderr,
    )

    result = run_concept(
        prompt=args.prompt,
        creativity=args.creativity,
        aspect_ratio=args.aspect_ratio,
        seed=args.seed,
        look_id=args.look_id,
        out=args.out,
        model=args.model,
    )
    print(json.dumps(result, indent=2, default=str))

    if result.get("success") is True:
        return EXIT_OK
    if result.get("error") in ("invalid_creativity", "look_not_found",
                               "look_registry_error"):
        return EXIT_VALIDATION
    return EXIT_PARTIAL


def _run_from_script_rederive(
    project: str,
    episode: int,
    selector,
    *,
    dry_run: bool,
) -> int:
    episode_str = f"ep_{episode:03d}"
    paths = ProjectPaths.for_episode(project, episode)
    core_paths = CoreProjectPaths.for_project(project)

    pre_verdict = resolve_from_script_target(
        project,
        episode,
        selector,
        live_plan=None,
    )
    if not pre_verdict.proceed:
        print(
            f"from-script: {pre_verdict.reason} target={pre_verdict.scene_id}: "
            f"{pre_verdict.message}"
        )
        return EXIT_VALIDATION

    target_scene_id = pre_verdict.scene_id
    if dry_run:
        print(
            f"from-script: target={target_scene_id}; would attempt the non-$ "
            "upstream camera_test+plan heal + a targeted scene refresh "
            "(subject to the live post-upstream resolver — may be a no-op or "
            "escalate); no take, no board"
        )
        return EXIT_OK

    lock_path: Path | None = None
    try:
        try:
            lock_path = acquire_episode_lock(paths.project_root, episode)
        except RuntimeError as exc:
            print(f"from-script: locked: {exc}", file=sys.stderr)
            return EXIT_LOCKED

        ip = IngestPipeline(project=project, project_root=paths.project_root)
        try:
            ip.run_camera_test(episode_num=episode)
            bible = ip._load_bible()
            plan_result = ip.run_storyboard_pass(episode, bible)
            if plan_result is None:
                print(
                    "from-script: Plan Pass returned no plan — aborting",
                    file=sys.stderr,
                )
                return EXIT_PARTIAL
            derivation_manifest.clear_flag(project, episode, "location.unresolved")
        except Exception as exc:
            if isinstance(exc, LocationUnresolvedError):
                derivation_manifest.stamp_flag(
                    project,
                    episode,
                    "location.unresolved",
                    {"error": str(exc), "shot_index": exc.shot_index, "hint": exc.hint},
                )
            print(
                "from-script: Camera Test / Plan Pass failed validation "
                f"({exc}); sent to review queue — aborting",
                file=sys.stderr,
            )
            return EXIT_PARTIAL

        try:
            gate_rc = _run_gate(project, episode)
        except BreakdownExtractError as exc:
            print(f"from-script: coverage gate failed: {exc}", file=sys.stderr)
            return EXIT_PARTIAL
        if gate_rc == EXIT_LOCKED:
            print(
                "from-script: coverage gate BLOCKED — approve the bible proposal "
                "then re-run"
            )
            return EXIT_LOCKED
        if gate_rc != EXIT_OK:
            return gate_rc

        plan_path = core_paths.plans_dir / f"{episode_str}_plan.json"
        new_plan = load_plan(plan_path)
        post_verdict = resolve_from_script_target(
            project,
            episode,
            selector,
            live_plan=new_plan,
        )
        if not post_verdict.proceed:
            print(
                f"from-script: {post_verdict.reason} target={post_verdict.scene_id}: "
                f"{post_verdict.message}"
            )
            if post_verdict.reason == "not_stale":
                print(
                    "from-script: upstream camera_test+plan heal completed; "
                    "target scene already reflects the current script; no "
                    "scene write, no take, no board"
                )
                return EXIT_OK
            return EXIT_VALIDATION

        store = ExecutionStore(project, migrate=True)
        step_runner = StepRunner(store=store, paths=paths, episode=episode)
        runner = EpisodeRunner(
            project=project,
            plan=new_plan.raw,
            episode=episode_str,
            budget_usd=25.0,
            step_runner=step_runner,
            strategy_engine=None,
        )
        from recoil.pipeline.core.persistence import load_manifest

        pre_manifest = load_manifest(project, episode_str, target_scene_id)
        pre_known_versions = {
            entry.get("version")
            for entry in (pre_manifest or {}).get("versions", [])
            if isinstance(entry.get("version"), int)
        }
        result = asyncio.run(
            runner.run_episode_batches(
                new_plan,
                derive_only=True,
                dedup_candidate=True,
                dry_run=False,
                grouping=selector.strategy,
                only_scene_ids={target_scene_id},
            )
        )
    except SceneIdentityMismatchError as exc:
        # REC-231 Phase 6 identity-halt: the re-derive generated a DIFFERENT scene_id
        # than the requested target (a renumber/split) — HALT + report, never
        # auto-cascade. The store wrote nothing (no body, no manifest entry).
        print(
            f"from-script: IDENTITY HALT — {exc}; no scene written "
            "(a renumber is a new identity, never a version)",
            file=sys.stderr,
        )
        return EXIT_VALIDATION
    except Exception as exc:
        print(f"from-script: failed: {exc}", file=sys.stderr)
        return EXIT_PARTIAL
    finally:
        if lock_path is not None:
            release_episode_lock(lock_path)

    written = set(result.get("written") or [])
    skipped = list(result.get("skipped") or [])
    if target_scene_id not in written:
        locked_skip = next(
            (item for item in skipped if item.get("scene_id") == target_scene_id),
            None,
        )
        if locked_skip is not None:
            print(
                f"from-script: locked target={target_scene_id} "
                f"lock_reason={locked_skip.get('lock_reason') or 'unspecified'}; "
                "scene unchanged"
            )
            return EXIT_LOCKED
        print(
            f"from-script: target={target_scene_id} was not written; "
            f"written={sorted(written)} skipped={skipped}",
            file=sys.stderr,
        )
        return EXIT_PARTIAL

    candidate_version = (result.get("candidate_versions") or {}).get(target_scene_id)
    if candidate_version in pre_known_versions:
        print(
            f"from-script: candidate already exists at version v{candidate_version}; "
            "conform it to make it active. No take dispatched; board unchanged."
        )
    elif candidate_version is not None:
        print(
            f"from-script: appended candidate version v{candidate_version} for "
            f"target scene {target_scene_id}; conform it to make it active. "
            "No take dispatched; board unchanged."
        )
    else:
        print(
            f"from-script: appended candidate version for target scene {target_scene_id}; "
            "conform it to make it active. No take dispatched; board unchanged."
        )
    print(
        "from-script: episode-level currency-check will still report scenes "
        "STALE because sibling scenes legitimately remain stale-vs-new-plan; "
        "run `rederive --board-only` after Brick 4 to render the board."
    )
    return EXIT_OK


def _print_scene_version_dailies(project: str, batch: str) -> None:
    """Print the CLI dailies surface for a batch (REC-231 Phase 6).

    Active version + every registered version with its shot-count delta vs the active
    version, via the scene-version read model. The pointer is the sole authority — the
    newest version is never reported active. v1 dailies is CLI-only; rich visual dailies
    + REC-190 board-review integration are DEFERRED.
    """
    from recoil.workspace.readmodel import get_scene_versions

    versions = get_scene_versions(batch, project)
    active = versions.active_version
    active_shots = next(
        (v.shot_count for v in versions.versions if v.version == active), None
    )
    print(
        f"dailies: batch={batch} active_version={active} "
        f"newer_unpointed_versions={versions.newer_unpointed_versions}"
    )
    for v in versions.versions:
        delta = v.shot_count - active_shots if active_shots is not None else 0
        marker = "*" if v.version == active else " "
        print(
            f"  {marker} v{v.version} state={v.state} downstream={v.downstream} "
            f"kind={v.kind} shots={v.shot_count} delta={delta:+d}"
        )


def _run_pointer_move_rederive(
    project: str,
    episode: int,
    selector,
    batch: str,
    *,
    to_version: int,
    verb: str,
    dry_run: bool = False,
) -> int:
    """REC-231 Phase 6 operator verb: move the active-version pointer + print dailies.

    `conform` approves a candidate by moving the active-version pointer to it; `revert`
    points back at a prior version. Both delegate to ``SceneVersionStore`` — the SOLE
    mover of ``active_version`` — then print the CLI dailies surface (active version +
    per-version shot-count delta) via the read model. Whole-scene only (per-shot conform
    is DEFERRED).
    """
    from recoil.pipeline.core.scene_version_store import SceneVersionStore
    from recoil.pipeline.core.persistence import load_manifest

    episode_str = f"ep_{episode:03d}"
    batch_id = selector.scene_id
    store = SceneVersionStore(project, episode_str)
    try:
        if dry_run:
            manifest = load_manifest(project, episode_str, batch_id)
            if manifest is None:
                if verb == "conform":
                    raise ValueError(
                        f"cannot conform batch {batch_id!r}: no versions manifest "
                        "(a conform presupposes a registered candidate)"
                    )
                raise ValueError(
                    f"cannot revert batch {batch_id!r}: no versions manifest"
                )
            current_version = manifest["active_version"]
            store._require_version(manifest, batch_id, to_version)
            store._validate_pointer_target(batch_id, manifest, to_version)
        elif verb == "conform":
            store.conform(batch_id, to_version)
        else:
            store.revert(batch_id, to_version)
    except (ValueError, FileNotFoundError) as exc:
        print(f"rederive --{verb}: {exc}", file=sys.stderr)
        return EXIT_VALIDATION

    if dry_run:
        print(
            f"rederive --{verb} dry-run project={project} episode={episode_str} "
            f"batch={batch_id} would move active_version "
            f"{current_version} -> {to_version} (validation=ok)"
        )
        _print_scene_version_dailies(project, batch)
        return EXIT_OK

    print(
        f"rederive --{verb} project={project} episode={episode_str} "
        f"batch={batch_id} active_version={to_version}"
    )
    _print_scene_version_dailies(project, batch)
    return EXIT_OK


def _run_rederive_cli(argv: list[str]) -> int:
    """argparse + STAGE 0/A/C dispatch for the `rederive` subcommand."""
    parser = argparse.ArgumentParser(
        prog="generate.py rederive",
        description=(
            "Re-derive episode scene batches without dispatching video takes. "
            "Runs Camera Test, Plan Pass, then grouping/cluster -> guarded scene write."
        ),
    )
    parser.add_argument(
        "--project",
        required=True,
        help="Project name (directory under projects/)",
    )
    parser.add_argument(
        "--episode",
        required=True,
        type=int,
        help="Episode number (1-based integer)",
    )
    parser.add_argument(
        "--grouping",
        default="auto",
        choices=("auto", "continuity", "coverage", "oner", "solo"),
        help="Grouping strategy to assemble (default: auto)",
    )
    parser.add_argument(
        "--from-script",
        action="store_true",
        help="Target one persisted scene batch and heal it from the live script.",
    )
    parser.add_argument(
        "--board-only",
        action="store_true",
        help="Target one persisted scene batch and re-render its board only.",
    )
    parser.add_argument(
        "--batch",
        metavar="EP###_CONT_###|EP###_ONER_###",
        help=(
            "Target selector for rederive --from-script / --board-only / "
            "--conform / --revert."
        ),
    )
    parser.add_argument(
        "--conform",
        action="store_true",
        help=(
            "REC-231 operator verb: approve a candidate by moving the active-version "
            "pointer to --to-version (requires --batch + --to-version)."
        ),
    )
    parser.add_argument(
        "--revert",
        action="store_true",
        help=(
            "REC-231 operator verb: move the active-version pointer back to a prior "
            "--to-version (requires --batch + --to-version)."
        ),
    )
    parser.add_argument(
        "--to-version",
        type=int,
        default=None,
        metavar="N",
        help="Target version for --conform/--revert (REC-231).",
    )
    parser.add_argument(
        "--skip-camera-test",
        action="store_true",
        help="Reuse the existing camera-tested episode instead of regenerating it.",
    )
    parser.add_argument(
        "--skip-plan",
        action="store_true",
        help=(
            "Reuse the on-disk plan cache WITHOUT a currency check; may silently "
            "run against a cache stale vs the script. DEPRECATED — prefer "
            "--reuse-plan-cache (currency-gated, fail-closed); retained for "
            "back-compat, tombstoned in Brick 2b."
        ),
    )
    parser.add_argument(
        "--skip-extract",
        action="store_true",
        help=(
            "Reuse the on-disk extract cache WITHOUT a currency check; may "
            "silently run against a cache stale vs the script. DEPRECATED — "
            "prefer --reuse-plan-cache (currency-gated, fail-closed); retained "
            "for back-compat, tombstoned in Brick 2b."
        ),
    )
    parser.add_argument(
        "--reuse-plan-cache",
        action="store_true",
        help=(
            "Reuse the on-disk plan only when the plan derivation manifest is "
            "current; fails closed when stale."
        ),
    )
    parser.add_argument(
        "--force-replan",
        action="store_true",
        help="Force a full Camera Test + Plan Pass + extract regenerate.",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Report would-write/would-skip scene batches without writing scenes.",
    )
    parser.add_argument(
        "--reason",
        default="",
        help="Operator reason to persist with the replan retry event.",
    )
    parser.add_argument(
        "--strategy",
        default="",
        help="Operator strategy tag to persist with the replan retry event.",
    )
    args = parser.parse_args(argv)

    selector = None
    if args.reuse_plan_cache and args.force_replan:
        parser.error("--reuse-plan-cache and --force-replan are mutually exclusive")
    if args.force_replan and (
        args.skip_camera_test or args.skip_plan or args.skip_extract
    ):
        parser.error(
            "--force-replan cannot be combined with --skip-* — it forces a full regenerate"
        )
    if args.reuse_plan_cache and (
        args.skip_camera_test or args.skip_plan or args.skip_extract
    ):
        parser.error(
            "--reuse-plan-cache cannot be combined with --skip-* — it owns the reuse decision"
        )
    if (args.reuse_plan_cache or args.force_replan) and (
        args.from_script or args.board_only or args.conform or args.revert
    ):
        parser.error(
            "--reuse-plan-cache/--force-replan are whole-episode derive flags "
            "and cannot be combined with --from-script/--board-only/--conform/--revert"
        )
    if args.from_script and args.board_only:
        parser.error(
            "--board-only and --from-script are separate operations — run "
            "them as two commands"
        )
    # REC-231 Phase 6: --conform/--revert are NEW pointer-move verbs taking
    # --batch <selector> --to-version N. They are mutually exclusive with each other
    # AND with --from-script/--board-only; --to-version is valid ONLY with them.
    if (args.conform or args.revert) and (args.from_script or args.board_only):
        parser.error(
            "--conform/--revert are pointer-move verbs and cannot be combined with "
            "--from-script/--board-only — run them as separate commands"
        )
    if args.conform and args.revert:
        parser.error("--conform and --revert are mutually exclusive")
    if (args.conform or args.revert) and not args.batch:
        parser.error("--conform/--revert requires --batch")
    if (args.conform or args.revert) and args.to_version is None:
        parser.error("--conform/--revert requires --to-version N")
    if args.to_version is not None and not (args.conform or args.revert):
        parser.error("--to-version is only valid with --conform or --revert")
    if args.from_script and not args.batch:
        parser.error("--from-script requires --batch")
    if args.board_only and not args.batch:
        parser.error("--board-only requires --batch")
    if args.batch and not (
        args.from_script or args.board_only or args.conform or args.revert
    ):
        parser.error("rederive --batch requires --from-script or --board-only")
    if args.from_script and (
        args.skip_camera_test or args.skip_plan or args.skip_extract
    ):
        parser.error(
            "--from-script cannot be combined with --skip-* — it always heals "
            "from the live script"
        )
    if args.board_only and (
        args.skip_camera_test
        or args.skip_plan
        or args.skip_extract
        or bool(args.reason)
        or bool(args.strategy)
        or args.grouping != "auto"
    ):
        parser.error(
            "--board-only re-renders the board only; it takes no "
            "scene-rederive flags"
        )
    if args.from_script or args.board_only or args.conform or args.revert:
        selector = parse_batch_selector(args.batch)
        if selector is None:
            parser.error(
                f"invalid --batch selector {args.batch!r} "
                "(expected EP###_CONT_### or EP###_ONER_###)"
            )
        if selector.episode != args.episode:
            parser.error(
                f"--batch episode EP{selector.episode:03d} != "
                f"--episode {args.episode}"
            )

    if args.conform or args.revert:
        return _run_pointer_move_rederive(
            args.project,
            args.episode,
            selector,
            args.batch,
            to_version=args.to_version,
            verb="conform" if args.conform else "revert",
            dry_run=args.dry_run,
        )

    if args.board_only:
        return _run_board_only_rederive(
            args.project,
            args.episode,
            args.batch,
            dry_run=args.dry_run,
        )

    if args.from_script:
        return _run_from_script_rederive(
            args.project,
            args.episode,
            selector,
            dry_run=args.dry_run,
        )

    if args.reuse_plan_cache:
        fresh, broken = derivation_manifest.freshness(args.project, args.episode, "plan")
        if not fresh:
            print(
                "rederive: --reuse-plan-cache refused stale plan cache "
                f"(broken at {broken or 'plan'}); drop --reuse-plan-cache or "
                "pass --force-replan to regenerate",
                file=sys.stderr,
            )
            return EXIT_PARTIAL
        args.skip_camera_test = True
        args.skip_plan = True

    print(
        "COST: free CLI lane (Opus OAuth) — Camera Test + Plan Pass + extract "
        "+ cluster cost $0.00; zero scene-layer takes dispatched; boards + "
        "video are separate paid stages, not run here"
    )
    if args.dry_run and (not args.skip_camera_test or not args.skip_plan):
        print(
            "note: --dry-run still regenerates the camera-test + plan "
            "(Stages 0/2 have no dry-run write-guard); pair with "
            "--skip-camera-test --skip-plan for a fully read-only run"
        )

    project = args.project
    episode = args.episode
    episode_str = f"ep_{episode:03d}"
    project_root = resolve_projects_root() / project
    core_paths = CoreProjectPaths.for_project(project)
    legacy_script_path = project_root / "episodes" / f"{episode_str}.md"
    script_path = (
        legacy_script_path
        if legacy_script_path.exists()
        else core_paths.episodes_dir / f"{episode_str}.md"
    )
    plan_path = ProjectPaths.for_episode(project, episode).plans_dir / f"{episode_str}_plan.json"
    script_sha256 = _sha256_file(script_path)
    plan_sha256_before = _sha256_file(plan_path)
    stages_run: list[str] = []
    gate_status = "skipped" if args.skip_extract else ""
    plan = None

    try:
        if not args.skip_camera_test or not args.skip_plan:
            ip = IngestPipeline(project=project, project_root=project_root)
            try:
                if not args.skip_camera_test:
                    ip.run_camera_test(episode_num=episode)
                    stages_run.append("camera_test")
                if not args.skip_plan:
                    bible = ip._load_bible()
                    plan = ip.run_storyboard_pass(episode, bible)
                    stages_run.append("plan")
                    derivation_manifest.clear_flag(project, episode, "location.unresolved")
            except Exception as exc:
                if isinstance(exc, LocationUnresolvedError):
                    derivation_manifest.stamp_flag(
                        project,
                        episode,
                        "location.unresolved",
                        {"error": str(exc), "shot_index": exc.shot_index, "hint": exc.hint},
                    )
                print(
                    "rederive: Camera Test / Plan Pass failed validation "
                    f"({exc}); sent to review queue — aborting",
                    file=sys.stderr,
                )
                return EXIT_PARTIAL
        if not args.skip_plan:
            if plan is None:
                print(
                    "rederive: Plan Pass returned no plan "
                    "(dry-run or skipped) — aborting derive",
                    file=sys.stderr,
                )
                return EXIT_PARTIAL

        if not args.skip_extract:
            try:
                gate_rc = _run_gate(project, episode)
            except BreakdownExtractError as exc:
                print(f"rederive: coverage gate failed: {exc}", file=sys.stderr)
                return EXIT_PARTIAL
            if gate_rc == EXIT_LOCKED:
                print(
                    "rederive: coverage gate BLOCKED — approve the bible proposal "
                    "(python3 recoil/pipeline/tools/breakdown_gate_cli.py --approve "
                    "<path>) then re-run with --skip-camera-test --skip-plan "
                    "--skip-extract"
                )
                return EXIT_LOCKED
            if gate_rc != EXIT_OK:
                return gate_rc
            gate_status = "clean"
            stages_run.append("gate")

        paths = ProjectPaths.for_episode(project, episode)
        canonical_plan = load_plan(plan_path)

        store = ExecutionStore(project, migrate=True)
        step_runner = StepRunner(store=store, paths=paths, episode=episode)
        runner = EpisodeRunner(
            project=project,
            plan=canonical_plan.raw,
            episode=episode_str,
            budget_usd=25.0,
            step_runner=step_runner,
            strategy_engine=None,
        )

        resolved = _resolve_grouping(args.grouping, None)
        result = asyncio.run(
            runner.run_episode_batches(
                canonical_plan,
                derive_only=True,
                dry_run=args.dry_run,
                grouping=resolved,
            )
        )
        stages_run.append("cluster")
    except Exception as exc:
        print(f"rederive: failed: {exc}", file=sys.stderr)
        return EXIT_PARTIAL

    written = result["written"]
    skipped = result["skipped"]
    if not args.dry_run:
        learning = LearningEngine(project)
        learning.ingest_retry(
            event_kind="operator_replan",
            source="operator",
            granularity="episode",
            target_id=episode_str,
            stage_from="script",
            stage_to="scenes",
            stages_run=stages_run,
            strategy_applied=(
                args.strategy or "rederive_from_script_preserve_locked"
            ),
            succeeded=True,
            notes=args.reason or "",
            before={
                "script_sha256": script_sha256,
                "plan_sha256_before": plan_sha256_before,
            },
            after={
                "plan_sha256": _sha256_file(plan_path),
                "written_scenes": list(written),
                "gate_status": gate_status,
            },
            skipped_locked=list(skipped),
            outcome="pending",
            artifact_links={
                "script": str(script_path),
                "plan": str(plan_path),
            },
            linear_issue_ids=[],
        )
        learning.flush()
    print(
        f"rederive project={project} episode={episode_str} "
        f"written={len(written)} skipped={len(skipped)}"
    )
    for s in skipped:
        print(f"SKIP {s['scene_id']} (locked: {s['lock_reason']})")
    return EXIT_OK


def _run_edit_pass_cli(argv: list[str]) -> int:
    """argparse + dispatch for the `edit-pass` subcommand (krea2-flora Phase 4).

    Sanctioned manual/on-demand re-render of an EXISTING frame through a Flora
    i2i/is2i image model with the bound Look/Identity refs + the source frame as
    base. Mirrors the flat CLI's --project/--episode convention and routes
    through StepRunner.execute_keyframe (NOT a parallel generation path).
    """
    parser = argparse.ArgumentParser(
        prog="generate.py edit-pass",
        description=(
            "krea2-flora reference-conditioned edit pass — re-render an "
            "EXISTING frame through Flora i2i/is2i with the bound Look/Identity "
            "refs + the source frame as base (drift-repair / previz-restyle)."
        ),
    )
    parser.add_argument(
        "--project", required=True,
        help="Project name (directory under projects/)",
    )
    parser.add_argument(
        "--episode", required=True, type=int,
        help="Episode number (1-based integer)",
    )
    parser.add_argument(
        "--shot", dest="shot_id", required=True, metavar="SHOT_ID",
        help="Shot identifier the re-rendered frame belongs to",
    )
    parser.add_argument(
        "--source", dest="source_frame", required=True, metavar="FRAME_PATH",
        help="Path to the EXISTING frame to re-render (the i2i/is2i base)",
    )
    parser.add_argument(
        "--model", default="seedream-v4.5",
        help="Flora image model id (default: seedream-v4.5 — registers is2i)",
    )
    parser.add_argument(
        "--prompt", default="",
        help="Optional edit/restyle prompt (bound Look's text rides on top)",
    )
    parser.add_argument(
        "--aspect", dest="aspect_ratio", default="9:16",
        help="Aspect ratio (default 9:16; bound Look's aspect wins when set)",
    )
    args = parser.parse_args(argv)

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(name)s %(levelname)s %(message)s",
        stream=sys.stderr,
    )

    result = run_edit_pass(
        project=args.project,
        episode=args.episode,
        shot_id=args.shot_id,
        source_frame=args.source_frame,
        model=args.model,
        prompt=args.prompt,
        aspect_ratio=args.aspect_ratio,
    )
    print(json.dumps(result, indent=2, default=str))

    if result.get("success") is True:
        return EXIT_OK
    if result.get("error") == "source_frame_missing":
        return EXIT_VALIDATION
    return EXIT_PARTIAL


def main() -> int:
    """Parse args and run generation. Returns exit code."""
    # krea2-flora Phase 4: the `edit-pass` subcommand is dispatched off the
    # first argv token, leaving the existing flat CLI (the default,
    # subcommand-less invocation) byte-identical. `--help` lists it below.
    if len(sys.argv) > 1 and sys.argv[1] == "edit-pass":
        return _run_edit_pass_cli(sys.argv[2:])
    # krea2-flora Phase 5: the `concept` look-dev subcommand is dispatched off
    # the first argv token the same way, leaving the flat CLI byte-identical.
    if len(sys.argv) > 1 and sys.argv[1] == "concept":
        return _run_concept_cli(sys.argv[2:])
    if len(sys.argv) > 1 and sys.argv[1] == "rederive":
        return _run_rederive_cli(sys.argv[2:])
    if len(sys.argv) > 1 and sys.argv[1] == "currency-check":
        return _run_currency_check_cli(sys.argv[2:])

    parser = argparse.ArgumentParser(
        description="Recoil video generation via coverage passes.",
        usage=(
            "%(prog)s [options]  # see below for required flags\n"
            "       %(prog)s edit-pass --project P --episode N --shot S "
            "--source FRAME  # krea2-flora reference-conditioned edit pass\n"
            "       %(prog)s concept --prompt TEXT [--creativity LEVEL] "
            "[--look LOOK_ID]  # krea2-flora Krea 2 look-dev / concepting"
        ),
        epilog=(
            "subcommands:\n"
            "  edit-pass   Re-render an EXISTING frame through Flora i2i/is2i "
            "with the bound Look/Identity (drift-repair / previz-restyle).\n"
            "              Run `%(prog)s edit-pass --help` for its flags.\n"
            "  concept     Look-dev / concepting via Krea 2 (t2i-krea-2-t2i) "
            "with the creativity dial; outputs become Look style_refs.\n"
            "              Run `%(prog)s concept --help` for its flags."
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )

    # Required args
    parser.add_argument(
        "--project",
        required=True,
        help="Project name (directory under projects/)",
    )
    parser.add_argument(
        "--episode",
        required=True,
        type=int,
        help="Episode number (1-based integer)",
    )

    # Mutually exclusive scope selectors: --pass, --passes, or --all
    scope_group = parser.add_mutually_exclusive_group()
    scope_group.add_argument(
        "--pass",
        dest="pass_id",
        metavar="PASS_ID",
        help="Single pass ID to run",
    )
    scope_group.add_argument(
        "--passes",
        dest="passes",
        metavar="PASS_ID[,PASS_ID,...]",
        help="Comma-separated list of pass IDs to run",
    )
    scope_group.add_argument(
        "--all",
        action="store_true",
        help="Run all passes in the intent file",
    )
    scope_group.add_argument(
        "--batch",
        dest="batch",
        metavar="EP###_CONT_###|EP###_ONER_###|BATCH_###",
        help=(
            "Continuity/oner batch selector for a single-batch --new-take "
            "reroll, or persisted scene id for --lock-scene/--unlock-scene"
        ),
    )
    scope_group.add_argument(
        "--storyboard",
        dest="storyboard",
        metavar="EP###_CONT_###|EP###_ONER_###",
        help=(
            "Build a storyboard strip for one continuity/oner batch; "
            "canonical surface is `rederive --batch X --board-only`"
        ),
    )
    scope_group.add_argument(
        "--approve-board",
        dest="approve_board",
        metavar="EP###_CONT_###|EP###_ONER_###",
        help="Approve a proposed storyboard strip for one batch",
    )
    scope_group.add_argument(
        "--reject-board",
        dest="reject_board",
        metavar="EP###_CONT_###|EP###_ONER_###",
        help="Reject a proposed storyboard strip for one batch",
    )
    scope_group.add_argument(
        "--revalidate-board",
        dest="revalidate_board",
        metavar="EP###_CONT_###|EP###_ONER_###",
        help="Re-bless a previously approved storyboard strip without rendering",
    )
    scope_group.add_argument(
        "--story-gate-eval",
        dest="story_gate_eval",
        action="store_true",
        help="Run offline StoryGate calibration against committed seed labels",
    )
    parser.add_argument(
        "--reason",
        default=None,
        metavar="TEXT",
        help="Optional human approve/reject reason; valid only with --approve-board/--reject-board.",
    )
    parser.add_argument(
        "--lock-scene",
        dest="lock_scene",
        nargs="?",
        const="manual",
        default=None,
        metavar="REASON",
        help="Lock the scene selected by --batch, optionally with a reason.",
    )
    parser.add_argument(
        "--unlock-scene",
        dest="unlock_scene",
        action="store_true",
        help="Unlock the scene selected by --batch.",
    )
    parser.add_argument(
        "--route",
        default=None,
        choices=("board_problem", "script_problem", "style", "technical", "other"),
        metavar="ROUTE",
        help=(
            "Optional human rejection taxonomy route_hint for --approve-board/"
            "--reject-board. Choices intentionally differ from judge routes: "
            "includes style/technical and excludes prompt_problem/mixed/"
            "judge_unavailable."
        ),
    )
    parser.add_argument(
        "--auto-reroll",
        dest="auto_reroll",
        action="store_true",
        help="Automatically reroll fixable HARD storyboard gate failures; valid only with --storyboard.",
    )
    parser.add_argument(
        "--max-board-attempts",
        dest="max_board_attempts",
        type=int,
        default=3,
        choices=(2, 3, 4),
        metavar="N",
        help="Total board attempts for --storyboard --auto-reroll (default: 3).",
    )
    parser.add_argument(
        "--grouping",
        choices=("auto", "continuity", "coverage", "oner", "solo"),
        default="auto",
        help="Grouping strategy to assemble for the selected scope",
    )

    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Validate + estimate cost only; no generation, no lockfile",
    )
    parser.add_argument(
        "--validate",
        action="store_true",
        help="Validate passes file only; exit 0 if no BLOCKs, exit 2 if any BLOCK",
    )
    parser.add_argument(
        "--retry",
        action="store_true",
        help="Re-run a failed pass; aborts if PassStore shows it already completed or in-flight",
    )
    parser.add_argument(
        "--force-retry",
        action="store_true",
        help="Like --retry, but proceed if this machine has no local PassStore record",
    )
    parser.add_argument(
        "--new-take",
        "--reroll",
        dest="new_take",
        action="store_true",
        help="Append a new take for a single succeeded r2v_multi pass",
    )
    parser.add_argument(
        "--seed",
        type=int,
        default=None,
        help="Optional provider seed for --new-take",
    )
    parser.add_argument(
        "--make-primary",
        action="store_true",
        help="Promote a successful --new-take result to primary",
    )
    parser.add_argument(
        "--strategy",
        dest="strategy",
        default=None,
        metavar="NAME",
        help="Author-strategy override for --new-take (e.g. shot_spec)",
    )
    parser.add_argument(
        "--no-staleness-guard",
        dest="no_staleness_guard",
        action="store_true",
        help=(
            "Bypass the D3 coverage_passes staleness guard (dispatch even if the "
            "locked passes were built against a different plan structural_sha)."
        ),
    )
    parser.add_argument(
        "--budget",
        dest="budget",
        type=float,
        default=25.0,
        metavar="USD",
        help="Budget cap in USD for live runs (default: 25.0)",
    )
    parser.add_argument(
        "--labels",
        dest="story_gate_labels",
        type=Path,
        default=None,
        metavar="PATH",
        help="StoryGate calibration labels JSON; valid only with --story-gate-eval.",
    )
    parser.add_argument(
        "--samples",
        dest="story_gate_samples",
        type=int,
        default=None,
        metavar="K",
        help="StoryGate calibration samples per board/tier/crops mode; valid only with --story-gate-eval.",
    )
    parser.add_argument(
        "--tier",
        dest="story_gate_tiers",
        action="append",
        default=None,
        metavar="MODEL",
        help="StoryGate calibration model tier; repeat for multiple tiers.",
    )
    parser.add_argument(
        "--no-crops-compare",
        dest="story_gate_no_crops_compare",
        action="store_true",
        help="StoryGate calibration only runs crops=True; valid only with --story-gate-eval.",
    )

    args = parser.parse_args()
    effective_retry = args.retry or args.force_retry

    def _structured_cli_result(error: str, message: str) -> int:
        print(json.dumps({"success": False, "error": error, "message": message}, indent=2))
        return EXIT_VALIDATION

    board_selector = args.storyboard or args.approve_board or args.reject_board or args.revalidate_board
    scene_lock_action = args.lock_scene is not None or args.unlock_scene
    eval_flags_present = (
        args.story_gate_labels is not None
        or args.story_gate_samples is not None
        or args.story_gate_tiers is not None
        or args.story_gate_no_crops_compare
    )
    if eval_flags_present and not args.story_gate_eval:
        parser.error("--labels/--samples/--tier/--no-crops-compare are valid only with --story-gate-eval")
    if (args.reason is not None or args.route is not None) and not (
        args.approve_board or args.reject_board
    ):
        parser.error("--reason/--route are valid only with --approve-board/--reject-board")
    if args.auto_reroll and not args.storyboard:
        parser.error("--auto-reroll is valid only with --storyboard")
    if args.max_board_attempts != 3 and not args.auto_reroll:
        parser.error("--max-board-attempts is valid only with --storyboard --auto-reroll")
    if args.lock_scene is not None and args.unlock_scene:
        parser.error("--lock-scene and --unlock-scene are mutually exclusive")
    if scene_lock_action and not args.batch:
        parser.error("--lock-scene/--unlock-scene require --batch")

    if board_selector and (
        args.validate
        or effective_retry
        or args.new_take
        or args.seed is not None
        or args.make_primary
        or args.strategy is not None
    ):
        return _structured_cli_result(
            "invalid_args",
            "--storyboard/--approve-board/--reject-board/--revalidate-board are incompatible with generation flags.",
        )

    if scene_lock_action and (
        args.validate
        or effective_retry
        or args.new_take
        or args.seed is not None
        or args.make_primary
        or args.strategy is not None
        or args.dry_run
        or args.grouping != "auto"
        or args.pass_id
        or args.passes
        or args.all
        or board_selector
        or args.story_gate_eval
    ):
        return _structured_cli_result(
            "invalid_args",
            "--lock-scene/--unlock-scene require only --project, --episode, and --batch.",
        )

    if args.story_gate_eval:
        if (
            args.validate
            or effective_retry
            or args.new_take
            or args.seed is not None
            or args.make_primary
            or args.strategy is not None
            or args.dry_run
            or args.grouping != "auto"
        ):
            return _structured_cli_result(
                "invalid_args",
                "--story-gate-eval is incompatible with generation/storyboard flags.",
            )
        logging.basicConfig(
            level=logging.WARNING,
            format="%(asctime)s %(name)s %(levelname)s %(message)s",
            stream=sys.stderr,
        )
        try:
            code, report, table = _run_story_gate_eval(
                labels=args.story_gate_labels or _default_story_gate_labels_path(),
                samples=(
                    args.story_gate_samples
                    if args.story_gate_samples is not None
                    else 5
                ),
                tiers=args.story_gate_tiers,
                no_crops_compare=args.story_gate_no_crops_compare,
            )
        except ValueError as exc:
            print(json.dumps({"success": False, "error": "invalid_args", "message": str(exc)}, indent=2))
            return EXIT_VALIDATION
        print(json.dumps(report, indent=2, default=str))
        print(table)
        return code

    if args.storyboard:
        if args.auto_reroll:
            print(
                f"auto-reroll: up to {args.max_board_attempts} board attempts, "
                "~$0.41 each"
            )
        result = _run_storyboard_build(
            project=args.project,
            episode=args.episode,
            batch=args.storyboard,
            dry_run=args.dry_run,
            auto_reroll=args.auto_reroll,
            max_board_attempts=args.max_board_attempts,
        )
        stop_block = (
            _auto_reroll_stop_block(args.project, result)
            if args.auto_reroll
            else None
        )
        if stop_block:
            print(stop_block)
        print(_json_line(result))
        return _storyboard_result_exit_code(result)

    if args.approve_board or args.reject_board:
        code, result = _run_board_decision(
            project=args.project,
            episode=args.episode,
            batch=args.approve_board or args.reject_board,
            decision="approve" if args.approve_board else "reject",
            reason=args.reason,
            route=args.route,
        )
        print(_json_line(result))
        return code

    if args.revalidate_board:
        code, result = _run_revalidate_board(
            project=args.project,
            episode=args.episode,
            batch=args.revalidate_board,
        )
        print(_json_line(result))
        return code

    if scene_lock_action:
        code, result = _run_scene_lock(
            project=args.project,
            episode=args.episode,
            batch=args.batch,
            reason=args.lock_scene,
            unlock=args.unlock_scene,
        )
        print(_json_line(result))
        return code

    if args.new_take and not args.pass_id and not args.batch:
        return _structured_cli_result(
            "new_take_requires_single_pass",
            "--new-take requires exactly one pass via --pass or a --batch selector.",
        )
    if args.batch and not args.new_take:
        return _structured_cli_result(
            "flag_requires_new_take",
            "--batch requires --new-take.",
        )
    if (
        args.seed is not None or args.make_primary or args.strategy is not None
    ) and not args.new_take:
        return _structured_cli_result(
            "flag_requires_new_take",
            "--seed, --make-primary, and --strategy require --new-take.",
        )
    if args.validate and args.grouping not in ("auto", "coverage"):
        return _structured_cli_result(
            "validate_requires_coverage_grouping",
            "--validate validates coverage plans only.",
        )

    # Enforce scope selector unless --validate (which validates all passes regardless)
    if not args.validate:
        if (
            not args.pass_id
            and not args.passes
            and not args.all
            and not args.batch
            and not args.story_gate_eval
        ):
            parser.error(
                "Exactly one of --pass, --passes, --all, --batch, --storyboard, "
                "--approve-board, --reject-board, or --story-gate-eval is required "
                "(or use --validate)."
            )
            return EXIT_VALIDATION

    # Set up logging: stderr only; INFO for dry-run/validate, WARNING otherwise
    log_level = logging.INFO if (args.dry_run or args.validate) else logging.WARNING
    logging.basicConfig(
        level=log_level,
        format="%(asctime)s %(name)s %(levelname)s %(message)s",
        stream=sys.stderr,
    )

    # Map argv to pass_ids
    if args.pass_id:
        pass_ids = [args.pass_id]
    elif args.passes:
        pass_ids = list(dict.fromkeys(p.strip() for p in args.passes.split(",") if p.strip()))
    else:
        pass_ids = None  # --all or --validate

    try:
        result = run_generation(
            project=args.project,
            episode=args.episode,
            pass_ids=pass_ids,
            grouping=args.grouping,
            dry_run=args.dry_run,
            validate_only=args.validate,
            retry=effective_retry,
            force_retry=args.force_retry,
            budget_usd=args.budget,
            force_new_take=args.new_take,
            seed=args.seed,
            make_primary=args.make_primary,
            batch=args.batch,
            strategy=args.strategy,
            enforce_staleness_guard=not args.no_staleness_guard,
        )
    except ValueError as exc:
        # Empty pass_ids or similar validation errors
        result = {"success": False, "error": "invalid_args", "message": str(exc)}

    # Emit JSON to stdout
    print(json.dumps(result, indent=2, default=str))

    # Determine exit code
    if result.get("success") is True:
        return EXIT_OK

    error = result.get("error", "")
    if error in (
        "validation_blocked", "passes_file_missing", "pass_filter_unmatched",
        "passes_parse_error", "passes_file_read_error", "invalid_args",
        "already_completed", "orphaned_in_flight", "retry_requires_single_pass",
        "retry_record_missing",
        "new_take_requires_single_pass", "flag_requires_new_take",
        "new_take_requires_single_r2v_multi_beat",
        "reroll_requires_succeeded_primary", "reroll_refs_drifted",
        "reroll_segment_drift", "reroll_collision",
        "reroll_non_coverage_deferred", "validate_requires_coverage_grouping",
        "invalid_batch_selector", "unknown_author_strategy",
        "batch_selector_metadata_mismatch", "batch_scene_missing",
        "reroll_identity_unresolvable", "coverage_passes_stale",
    ):
        return EXIT_VALIDATION

    if error == "locked":
        return EXIT_LOCKED

    if result.get("cancelled"):
        return EXIT_SIGNALLED

    # Partial failures (some passes failed but run completed)
    return EXIT_PARTIAL


if __name__ == "__main__":
    sys.exit(main())
