# ==============================================================================
# PORTED FROM STARSEND: orchestrator/step_runner.py
# DATE: 2026-03-29
# ==============================================================================
"""
step_runner.py — Unified generation executor for CLI and Production Console.

Single source of truth for all generation execution. Thread-safe: all store
mutations go through ExecutionStore's internal lock. Synchronous: blocks
until generation is complete. CLI calls directly; Console wraps in
ThreadPoolExecutor.

Migration order: Phase 3 adds execute_video, Phase 6 adds execute_keyframe,
Phase 7 adds transition().
"""

import logging
import os
import re
import subprocess
import time
import urllib.request
from pathlib import Path
from typing import Optional

from recoil.execution.execution_store import ExecutionStore
from recoil.execution.api_client import get_client  # CP-2 Phase 8 — stub re-exports
from recoil.pipeline._lib.sanctioned_fallbacks import fire_sanctioned_fallback
from recoil.execution.providers.payload_hints import StepRunnerHints, coerce_to_dict
from recoil.execution.types import GenerationResult
from recoil.core import model_profiles
from recoil.core.naming import build_filename, next_take_number

_GROUPING_FILENAME_ARGS_UNSET = object()
_FFMPEG_AVAILABLE: bool | None = None
_FFMPEG_UNAVAILABLE_MESSAGE = (
    "ffmpeg unavailable — cannot run video identity gates; refusing to dispatch "
    "(would fail-open every take). Install ffmpeg or run with gates disabled."
)


def _prompt_engine_schema_version() -> str:
    """Return the prompt_engine schema hash. Lazy import — keeps step_runner
    import cheap and avoids circular-import surface during construction.

    R4 fix: was hardcoded module-path string; now returns the 12-char hex hash
    so sidecars survive the audit-gate hash-format assertion #18.
    """
    try:
        from recoil.pipeline._lib.prompt_engine import PROMPT_ENGINE_SCHEMA_VERSION

        return PROMPT_ENGINE_SCHEMA_VERSION
    except Exception:
        return "unknown"


def _ensure_ffmpeg_available() -> None:
    """Verify ffmpeg is runnable once per process for video identity gates."""
    global _FFMPEG_AVAILABLE

    if _FFMPEG_AVAILABLE is True:
        return
    if _FFMPEG_AVAILABLE is False:
        raise RuntimeError(_FFMPEG_UNAVAILABLE_MESSAGE)

    try:
        result = subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=10)
    except (FileNotFoundError, subprocess.TimeoutExpired):
        _FFMPEG_AVAILABLE = False
        raise RuntimeError(_FFMPEG_UNAVAILABLE_MESSAGE)

    if result.returncode != 0:
        _FFMPEG_AVAILABLE = False
        raise RuntimeError(_FFMPEG_UNAVAILABLE_MESSAGE)

    _FFMPEG_AVAILABLE = True


from recoil.execution.step_types import (  # noqa: E402
    StepResult,
    ProjectPaths,
    GateVerdict,
    GateFunction,
    PassResult,
    SegmentResult,
)
from recoil.execution.feedback import FeedbackAgent, FeedbackAttempt  # noqa: E402
from recoil.core.critic import Outcome, Severity  # noqa: E402


# ──────────────────────────────────────────────────────────────────────
# Canonical shot-label helpers (Phase 2.5 Task 9)
# ──────────────────────────────────────────────────────────────────────


def _make_shot_label_for_keyframe(shot_id: str, project: str) -> str | None:
    """Filename label for keyframes: {PROJECT}_{EP}_S{NN}.

    Matches the format used by _save_keyframe. Used by _next_take_number
    to find existing keyframe files on disk. Returns None if shot_id can't
    be parsed — caller should fall back to using shot_id directly.
    """
    m = re.match(r"(EP\d+)_SH(\d+[A-Z]*)", shot_id)
    if m:
        return f"{project.upper()}_{m.group(1)}_S{m.group(2)}"
    return None


def _make_shot_label_canonical(
    shot_id: str, project: str = ""
) -> dict[str, str | None]:
    """Return all label variants for a shot_id (None for unparseable forms).

    `video` was historically a shot_{NNN} label produced by the now-deleted
    _make_shot_label_for_video. Video filenames are produced by
    build_filename (recoil.core.naming); this dict is kept for
    keyframe-only consumers.
    """
    return {
        "keyframe": _make_shot_label_for_keyframe(shot_id, project),
        "video": None,
    }


# ---------------------------------------------------------------------------
# Gate factories — adapters from `recoil.pipeline._lib.validation` gates to GateFunction protocol
# ---------------------------------------------------------------------------


def make_identity_gate(
    ref_paths: list[Path],
    prompt_skeleton: Optional[dict] = None,
    wardrobe_phase_id: Optional[str] = None,
    wardrobe_description: Optional[str] = None,
    continuity_ref_path: Optional[Path] = None,
    preceding_shot_path: Optional[Path] = None,
    preceding_shot_type: Optional[str] = None,
    current_shot_type: Optional[str] = None,
) -> GateFunction:
    """Factory: creates a progressive 2-stage Gate 2 as a GateFunction.

    Stage 2A (Character): Identity, hair, wardrobe, accessories.
      - Casting refs (2-3) + text wardrobe spec + last approved frame + keyframe
    Stage 2B (Environment): Background, lighting, spatial continuity.
      - Preceding shot + keyframe (skipped if shot types too different)

    Scoring: MINOR=1, NOTICEABLE=3, CRITICAL=5
      - total <= 1: PASS
      - total 2-3: FAIL + retriable (auto-regenerate)
      - total > 3: FAIL + non-retriable (flag for human review)

    Cost: $0.039 (2A only) to $0.078 (2A + 2B) per frame.
    """

    def gate_fn(keyframe_path: Path, shot_data: dict) -> GateVerdict:
        from recoil.core.paths import ensure_pipeline_importable

        ensure_pipeline_importable()
        from recoil.pipeline._lib.validation import Validator

        v = Validator()
        result = v.run_gate_2(
            keyframe_path=keyframe_path,
            ref_paths=ref_paths,
            prompt_skeleton=prompt_skeleton,
            wardrobe_phase_id=wardrobe_phase_id,
            wardrobe_description=wardrobe_description,
            continuity_ref_path=continuity_ref_path,
            preceding_shot_path=preceding_shot_path,
            preceding_shot_type=preceding_shot_type,
            current_shot_type=current_shot_type,
        )
        total_score = result.details.get("total_score", 0)
        retriable = (not result.passed) and (total_score <= 3)

        return GateVerdict(
            passed=result.passed,
            gate_name="gate_2",
            reason=_summarize_gate_2(result.details),
            details=result.details,
            cost=result.cost,
            retriable=retriable,
        )

    return gate_fn


def make_video_identity_gate(
    ref_paths: list[Path],
    prompt_skeleton: Optional[dict] = None,
    wardrobe_phase_id: Optional[str] = None,
    wardrobe_description: Optional[str] = None,
    extract_at_s: float = 0.5,
) -> GateFunction:
    """Gate 2 identity check for video takes.

    Extract one frame from the take video, then delegate scoring and verdict
    mapping to make_identity_gate. Frame extraction failures fail CLOSED with a
    retriable infra verdict tagged gate_closed_by=infra; an infra miss must not
    pass an ungated take.
    """
    inner = make_identity_gate(
        ref_paths=ref_paths,
        prompt_skeleton=prompt_skeleton,
        wardrobe_phase_id=wardrobe_phase_id,
        wardrobe_description=wardrobe_description,
    )

    _IMAGE_SUFFIXES = {".jpg", ".jpeg", ".png", ".webp", ".bmp"}

    def gate_fn(media_path: Path, shot_data: dict) -> GateVerdict:
        import tempfile

        media_path = Path(media_path)
        # execute_pass passes an already-extracted boundary FRAME (jpg); only
        # execute_video passes a video file. Score an image directly — running
        # ffmpeg -ss on a still fails and would fail-open, making the gate a
        # no-op on the r2v_multi path.
        if media_path.suffix.lower() in _IMAGE_SUFFIXES:
            frame = media_path
        else:
            frame = Path(tempfile.gettempdir()) / f"{media_path.stem}_idgate.jpg"
            try:
                subprocess.run(
                    [
                        "ffmpeg",
                        "-y",
                        "-ss",
                        f"{extract_at_s:.3f}",
                        "-i",
                        str(media_path),
                        "-frames:v",
                        "1",
                        "-q:v",
                        "2",
                        str(frame),
                    ],
                    capture_output=True,
                    timeout=30,
                    check=True,
                )
            except Exception as e:
                logger.warning(
                    "video identity gate: frame extract failed for %s: %s",
                    media_path,
                    e,
                )
                return GateVerdict(
                    passed=False,
                    gate_name="gate_2_video",
                    reason=f"frame_extract_failed:infra: {e}",
                    details={"gate_closed_by": "infra"},
                    cost=0.0,
                    retriable=True,
                    deferred=False,
                )

        verdict = inner(frame, shot_data)
        return GateVerdict(
            passed=verdict.passed,
            gate_name="gate_2_video",
            reason=verdict.reason,
            details=verdict.details,
            cost=verdict.cost,
            retriable=verdict.retriable,
        )

    return gate_fn


def build_identity_gates_from_payload(payload: dict) -> "Optional[list[GateFunction]]":
    """Build identity gate(s) from a SERIALIZABLE payload spec at dispatch time.

    The payload carries ``identity_gate_ref_paths`` (strings) rather than the gate
    callable so the persisted Workflow stays JSON-serializable (save_scene). Runners
    call this just before execute_pass / execute_video. An in-memory ``gates``
    callable list (test path) takes precedence. Returns None when no gate is set.
    """
    gates = payload.get("gates")
    if gates:
        return gates
    ref_strs = payload.get("identity_gate_ref_paths")
    if not ref_strs:
        return None
    return [make_video_identity_gate(ref_paths=[Path(p) for p in ref_strs])]


def make_video_drift_gate(
    ref_paths: list[Path],
    shot_metadata: Optional[dict] = None,
) -> GateFunction:
    """Factory: creates Gate 3 video drift check as a GateFunction.

    Returns DEFERRED verdict when drift detected — pipeline continues
    but shot is flagged for mandatory human review before export.
    API failures also produce DEFERRED (fail-open-but-flag).
    """

    def gate_fn(video_path: Path, shot_data: dict) -> GateVerdict:
        from recoil.core.paths import ensure_pipeline_importable

        ensure_pipeline_importable()
        from recoil.pipeline._lib.validation import Validator

        v = Validator()
        result = v.run_gate_3(
            video_path=video_path,
            ref_paths=ref_paths,
            shot_metadata=shot_metadata or shot_data,
        )
        drift_detected = result.details.get("deferred", False)
        drift_count = result.details.get("drift_count", 0)

        return GateVerdict(
            passed=True,  # Always pass — Gate 3 uses DEFERRED, not reject
            gate_name="gate_3",
            reason=(
                f"Video drift: {drift_count} frames flagged"
                if drift_detected
                else "No drift detected"
            ),
            details=result.details,
            cost=result.cost,
            retriable=False,
            deferred=drift_detected,
        )

    return gate_fn


def _summarize_gate_2(details: dict) -> str:
    """Summarize Gate 2 results (2A + optional 2B) into a human-readable string."""
    if "error" in details:
        return f"Gate 2 error: {details['error']}"
    mismatches = details.get("mismatches", [])
    total = details.get("total_score", 0)
    action = details.get("_action", "unknown")
    if not mismatches:
        return "No mismatches (score 0)"
    parts = [
        f"[{m.get('category', '?')}] {m.get('visual_evidence', m.get('description', '?'))} ({m.get('severity', '?')})"
        for m in mismatches
    ]
    return f"Score {total} → {action}: " + "; ".join(parts)


logger = logging.getLogger(__name__)


# ──────────────────────────────────────────────────────────────────────
# YAML generation metadata sidecar (Phase 3 — Seedance Prompt Pipeline)
# ──────────────────────────────────────────────────────────────────────


def _write_seeddance_meta_sidecar(
    video_path: Path,
    shot_id: str,
    model: str,
    prompt: str,
    duration: int,
    aspect_ratio: str,
    video_mode: str,
    cost_usd: float,
    latency_seconds: float,
    start_frame: Optional[Path] = None,
    reference_images: Optional[list[str]] = None,
) -> None:
    """Write a YAML metadata sidecar alongside a SeedDance video file.

    Path: {output_dir}/{shot_id}_meta.yaml

    Contains generation ID, timestamp, model, endpoint, prompt details,
    inputs, parameters, cost, and latency for post-hoc analysis.
    """
    import yaml
    from datetime import datetime, timezone

    timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
    gen_id = f"{shot_id}_{int(time.time())}"

    # Map video_mode to endpoint name
    endpoint_map = {
        "image2video": "i2v",
        "text2video": "t2v",
    }
    endpoint = endpoint_map.get(video_mode, "r2v")

    # R2V correction: text2video + reference_images = r2v, not t2v
    if video_mode == "text2video" and reference_images:
        endpoint = "r2v"

    # Determine prompt builder name from endpoint
    builder_map = {
        "i2v": "build_seeddance_i2v_prompt",
        "t2v": "build_seeddance_t2v_prompt",
        "r2v": "build_seeddance_r2v_prompt",
    }
    prompt_builder = builder_map.get(endpoint, "unknown")

    # Calculate cost from model_profiles pricing
    try:
        cost_per_second = model_profiles.get_profile(model).get("cost_per_second", 0.0)
        calculated_cost = duration * cost_per_second
    except (KeyError, TypeError):
        calculated_cost = cost_usd

    meta = {
        "generation": {
            "id": gen_id,
            "timestamp": timestamp,
            "model": model,
            "endpoint": endpoint,
            "prompt_builder": prompt_builder,
            "prompt_text": prompt,
            "prompt_word_count": len(prompt.split()),
            "inputs": {
                "character_refs": reference_images or [],
                "start_frame": str(start_frame) if start_frame else None,
            },
            "parameters": {
                "duration": duration,
                "aspect_ratio": aspect_ratio,
            },
            "cost_usd": round(calculated_cost, 4),
            "latency_seconds": round(latency_seconds, 2),
        }
    }

    meta_path = video_path.parent / f"{shot_id}_meta.yaml"
    with open(meta_path, "w", encoding="utf-8") as f:
        yaml.safe_dump(
            meta, f, default_flow_style=False, sort_keys=False, allow_unicode=True
        )

    logger.info("Wrote YAML meta sidecar: %s", meta_path.name)


def _aggregate_keyframe_refs(
    scene_ref_path,
    pose_ref_path,
    identity_refs,
    expression_refs,
    reference_images,
) -> list[str]:
    """Aggregate every ref path that reached the keyframe model into a
    de-duped list of str, preserving discovery order.

    Inputs accept None / Path / str / list[Path|str]. Output is the canonical
    refs_used list for keyframe sidecar emission.

    Includes the flat ``reference_images`` list (fal-routed multi-ref models
    like gpt-image-2 i2i). Prior to 2026-05-26 the sidecar aggregation
    omitted this source, so successful multi-ref edit calls landed with
    ``refs_used=[]`` even though the model received refs end-to-end.
    """
    out: list[str] = []
    if scene_ref_path is not None:
        out.append(str(scene_ref_path))
    if pose_ref_path is not None:
        s = str(pose_ref_path)
        if s not in out:
            out.append(s)
    for src in (identity_refs, expression_refs, reference_images):
        for r in src or []:
            s = str(r)
            if s not in out:
                out.append(s)
    return out


def _synthesize_take_id(take_record: dict, shot_id: str, take_idx: int) -> str:
    """Derive a take_id from a persisted take record.

    Must match lineage._resolve_take_id exactly — callers key on this value.
    Can't import from api/ (circular), so the logic is duplicated here.
    """
    raw = take_record.get("take_id")
    if raw:
        return str(raw)
    raw_num = take_record.get("take_number") or take_record.get("take_num") or take_idx
    try:
        return f"{shot_id}_T{int(raw_num):03d}"
    except (ValueError, TypeError):
        return f"{shot_id}_T{take_idx:03d}"


class SessionCostLogger:
    """Optional cost logger interface. CLI passes an implementation."""

    def record(self, shot_id: str, cost: float, model: str, pipeline: str) -> None:
        """Record a generation cost. Override in subclass."""
        pass


class StepRunner:
    """Single source of truth for all generation execution.

    Thread-safe: all store mutations go through ExecutionStore's internal lock.
    Synchronous: blocks until generation is complete.
    Callers: CLI calls directly; Console wraps in ThreadPoolExecutor.
    """

    def __init__(
        self,
        store: ExecutionStore,
        paths: ProjectPaths,
        cost_logger: Optional[SessionCostLogger] = None,
        validate_frames: bool = True,
        episode: Optional[int] = None,
    ) -> None:
        self._store = store
        self._paths = paths
        self._cost_logger = cost_logger
        self._validate_frames = validate_frames
        self._episode = episode
        # CP-2 Phase 9 — sidecar enrichment (additive). Default "unknown";
        # callers (generate.py / dispatch_cli.py / Production Console /
        # workspace MCP) plumb their own value in CP-5 by setting this attr
        # after construction. Threaded onto every sidecar write below.
        self._dispatch_path: str = "unknown"

    def _stamp_context_hints(self, hints: dict) -> dict:
        stamped = dict(hints)
        stamped["project"] = self._paths.project
        stamped["episode"] = self._episode
        return stamped

    def execute_video(
        self,
        shot_id: str,
        prompt: str,
        model: str,
        start_frame: Optional[Path] = None,
        end_frame: Optional[Path] = None,
        image_tail: Optional[str] = None,
        duration: int = 5,
        aspect_ratio: str = "9:16",
        gates: Optional[list[GateFunction]] = None,
        elements_payload: Optional[dict] = None,
        generate_audio: bool = True,
        on_status=None,
        reference_images: Optional[list[str]] = None,
        negative_prompt: Optional[str] = None,
        inputs_snapshot: dict | None = None,
        reference_videos: Optional[list[str]] = None,
        provider_hints: Optional[dict] = None,
        parent_take_id: Optional[str] = None,
        grouping: dict | None = None,
    ) -> StepResult:
        """Execute a video generation step. Blocks until complete.

        When start_frame is provided, generates image-to-video (I2V).
        When start_frame is None, generates text-to-video (T2V).
        When reference_videos is provided with provider_hints["endpoint"]="o3_edit_pro",
        generates video-to-video edit (V2V).

        Lifecycle:
        1. Transition to video_submitted
        2. Transition to video_processing
        3. Call API, wait for result
        4. On failure → video_failed, return
        5. On success → save video, run gates, video_complete, append take
        6. Return StepResult
        """
        cost = 0.0
        _gen_start_time = time.time()
        grouping_filename_args = self._video_grouping_filename_args(
            shot_id,
            grouping,
        )
        frames_valid_for_probe = (end_frame is None or end_frame.exists()) and (
            start_frame is None or start_frame.exists()
        )
        if gates and frames_valid_for_probe:
            _ensure_ffmpeg_available()

        try:
            # 1. Transition to video_submitted
            self._store.update_shot(shot_id, status="video_submitted")

            # 2. Transition to video_processing
            self._store.update_shot(shot_id, status="video_processing")

            # 3. Build payload and call API
            import base64

            # Validate end frame existence before building payload
            if end_frame is not None and not end_frame.exists():
                raise FileNotFoundError(
                    f"end_frame specified but not found: {end_frame}"
                )
            if start_frame is not None and not start_frame.exists():
                raise FileNotFoundError(
                    f"start_frame specified but not found: {start_frame}"
                )

            # CP-2 Phase 5 — provider-aware path via VideoModelClient.
            # Replaces the legacy 3-way client_type branching (Veo / Wan /
            # Kling-SeedDance else). Provider quirks (file paths vs base64
            # vs data URIs, dataclass vs dict) now live in the adapters;
            # StepRunner emits a single unified payload.
            unified_payload: dict = {
                "prompt": prompt,
                "duration": duration,
                "aspect_ratio": aspect_ratio,
                "shot_id": shot_id,
            }
            if isinstance(grouping, dict):
                unified_payload["grouping"] = dict(grouping)

            if start_frame is not None:
                unified_payload["image"] = base64.b64encode(
                    start_frame.read_bytes()
                ).decode()

            if image_tail is not None and "image_tail" not in unified_payload:
                unified_payload["image_tail"] = image_tail

            if end_frame is not None:
                unified_payload["image_tail"] = base64.b64encode(
                    end_frame.read_bytes()
                ).decode()

            if reference_images:
                unified_payload["reference_images"] = list(reference_images)

            if reference_videos:
                unified_payload["reference_videos"] = list(reference_videos)

            if generate_audio:
                unified_payload["generate_audio"] = True

            if negative_prompt:
                unified_payload["negative_prompt"] = negative_prompt

            if provider_hints:
                existing = coerce_to_dict(unified_payload.get("hints"))
                merged = {**existing, **provider_hints}
                normalized = {
                    k: v for k, v in merged.items() if k in StepRunnerHints.model_fields
                }
                extra = {
                    k: v
                    for k, v in merged.items()
                    if k not in StepRunnerHints.model_fields
                }
                unified_payload["hints"] = StepRunnerHints(**normalized, **extra)

            if elements_payload and "elements" in elements_payload:
                # Legacy fal.ai O3 elements pathway — surface via hints.
                # Phase C: typed StepRunnerHints construction with dict-comp
                # filter to normalize legacy hint keys; extra="allow" lets
                # adapter-specific keys (like Kling's "endpoint") ride along
                # without erroring.
                existing = coerce_to_dict(unified_payload.get("hints"))
                normalized = {
                    k: v
                    for k, v in existing.items()
                    if k in StepRunnerHints.model_fields
                }
                unified_payload["hints"] = StepRunnerHints(
                    elements=elements_payload["elements"],
                    endpoint="o3_ref_standard",
                    **normalized,
                )

            payload = unified_payload

            # Log end frame if specified (existence already validated above)
            if end_frame is not None:
                logger.info(
                    "End frame specified for shot %s: %s (model: %s)",
                    shot_id,
                    end_frame,
                    model,
                )

            # Validate start frame before submission. BLOCKING on hard failure —
            # a wrong-character start frame will burn budget in Kling for a doomed shot.
            # ERROR outcomes route to pending_qc per JT's middle-option decision (2026-04-09).
            if self._validate_frames and start_frame is not None:
                try:
                    from recoil.core.paths import ensure_pipeline_importable

                    ensure_pipeline_importable()
                    from recoil.pipeline._lib.critics.start_frame_critic import (
                        StartFrameCritic,
                    )

                    # Plumb character descriptions from inputs_snapshot so the
                    # CHARACTER_IDENTITY checks actually run (Opus audit finding,
                    # 2026-04-09 — was a ghost critic before this fix).
                    character_descriptions = []
                    expected_elements = []
                    if inputs_snapshot:
                        characters = inputs_snapshot.get("characters", [])
                        for char in characters:
                            character_descriptions.append(
                                {
                                    "name": char.get("display_name")
                                    or char.get("char_id", ""),
                                    "hair": char.get("hair", ""),
                                    "facial_hair": char.get("facial_hair", ""),
                                    "clothing": char.get("wardrobe")
                                    or char.get("clothing", ""),
                                }
                            )
                        expected_elements = inputs_snapshot.get("scene_elements", [])

                    intention_context = {
                        "character_anchor": inputs_snapshot.get("identity_anchor", "")
                        if inputs_snapshot
                        else "",
                        "scene_description": inputs_snapshot.get(
                            "scene_description", ""
                        )
                        if inputs_snapshot
                        else "",
                    }

                    critic = StartFrameCritic(
                        expected_background="scene",
                        character_descriptions=character_descriptions,
                        expected_elements=expected_elements,
                        shot_id=shot_id,
                        intention_context=intention_context,
                    )
                    _, sf_result = critic.run(str(start_frame))

                    # ERROR outcome (vision API down, infra failure during run) →
                    # route to pending_qc per JT's middle-option decision. Run continues,
                    # morning re-check script promotes or escalates.
                    if sf_result.errored:
                        logger.warning(
                            "Start frame validation ERRORED for %s: %s — routing to pending_qc",
                            shot_id,
                            sf_result.error,
                        )
                        self._store.update_shot(
                            shot_id,
                            status="pending_qc",
                            error_message=f"start_frame_critic ERROR: {sf_result.error}",
                            cost_incurred=cost,
                        )
                        return StepResult(
                            shot_id=shot_id,
                            success=False,
                            final_state="pending_qc",
                            output_path=None,
                            cost_usd=cost,
                            error=f"start_frame_critic ERROR: {sf_result.error}",
                            take_index=-1,
                            gate_verdict=None,
                            model=model,
                            pipeline="video",
                        )

                    # FAIL with hard failures → block video API call (saves Kling budget on doomed shots)
                    if sf_result.outcome == Outcome.FAIL:
                        hard_fails = [
                            f"{d.name}: {d.message}"
                            for d in sf_result.failed_dimensions
                            if d.severity == Severity.HARD
                        ]
                        if hard_fails:
                            logger.error(
                                "Start frame HARD FAIL for %s: %s — blocking video submit",
                                shot_id,
                                "; ".join(hard_fails),
                            )
                            self._store.update_shot(
                                shot_id,
                                status="video_semantic_failed",
                                error_message=f"Start frame hard failure: {'; '.join(hard_fails)}",
                                cost_incurred=cost,
                            )
                            return StepResult(
                                shot_id=shot_id,
                                success=False,
                                final_state="video_semantic_failed",
                                output_path=None,
                                cost_usd=cost,
                                error=f"Start frame hard failure: {'; '.join(hard_fails)}",
                                take_index=-1,
                                gate_verdict=None,
                                model=model,
                                pipeline="video",
                            )
                        else:
                            # Only soft failures — log and proceed per PC-1 keep-bias
                            soft_failed = [
                                f"{d.name}: {d.message}"
                                for d in sf_result.failed_dimensions
                            ]
                            logger.warning(
                                "Start frame soft-failed for %s: %s — proceeding per PC-1",
                                shot_id,
                                "; ".join(soft_failed),
                            )
                            self._store.update_shot(
                                shot_id,
                                validation_notes=f"Start frame soft failures: {'; '.join(soft_failed)}",
                            )
                    else:
                        logger.info("Start frame validation passed for %s", shot_id)

                except Exception as e:
                    # Infrastructure error (import, file missing). Route to pending_qc
                    # so the morning re-check can retry — don't lose the shot.
                    logger.error(
                        "Start frame validation infra error for %s: %s", shot_id, e
                    )
                    self._store.update_shot(
                        shot_id,
                        status="pending_qc",
                        error_message=f"Start frame validation crashed: {e}",
                    )
                    return StepResult(
                        shot_id=shot_id,
                        success=False,
                        final_state="pending_qc",
                        output_path=None,
                        cost_usd=cost,
                        error=str(e),
                        take_index=-1,
                        gate_verdict=None,
                        model=model,
                        pipeline="video",
                    )

            # CP-2 spec-review edit #8 — payload.hints schema. Stamp modality
            # at the StepRunner boundary so resolve_adapter() never sees an
            # unannotated payload. _dict_to_unified() preserves the hints
            # dict on the UnifiedVideoPayload.
            # Phase C: typed StepRunnerHints construction at this write site.
            existing_hints = coerce_to_dict(payload.get("hints"))
            normalized_hints = {
                k: v
                for k, v in existing_hints.items()
                if k in StepRunnerHints.model_fields
            }
            normalized_hints["modality"] = "video"
            normalized_hints = self._stamp_context_hints(normalized_hints)
            payload["hints"] = StepRunnerHints(**normalized_hints)
            assert coerce_to_dict(payload.get("hints")).get("modality") == "video", (
                f"payload.hints missing modality "
                f"(got keys: {list(coerce_to_dict(payload.get('hints')).keys())})"
            )

            # CP-2 Phase 5 — provider-aware dispatch via VideoModelClient.
            # The legacy get_client factory in api_client lives on until
            # Phase 8 (the deletion gate); execute_video no longer calls it.
            from recoil.execution.video_model_client import VideoModelClient

            # REC-38: the unified_payload carries 'hints' (StepRunnerHints), not
            # 'provider_hints' (that was the execute_video param, already merged in).
            # Reading the wrong key meant tier was always None → tier strategies
            # (UPGRADE_FAST_TO_PRO etc.) never reached VideoModelClient.
            _tier_hint = coerce_to_dict(payload.get("hints")).get("tier")
            client = VideoModelClient(model_id=model, tier=_tier_hint)
            job = client.submit(payload)
            # Wan generation takes ~6-7 min; Kling ~2-3 min; Veo ~1-2 min
            timeout_s = 720 if model.startswith("wan-") else 1800
            result = client.wait_for_job(job, timeout_s=timeout_s, on_status=on_status)
            cost = result.cost or 0.0

            # 4. Handle failure
            if not result.success:
                self._store.update_shot(
                    shot_id,
                    status="video_failed",
                    error_message=result.error or "Video generation failed",
                    cost_incurred=cost,
                )
                if self._cost_logger:
                    self._cost_logger.record(shot_id, cost, model, "video")
                return StepResult(
                    shot_id=shot_id,
                    success=False,
                    final_state="video_failed",
                    output_path=None,
                    cost_usd=cost,
                    error=result.error or "Video generation failed",
                    take_index=-1,
                    gate_verdict=None,
                    model=model,
                    pipeline="video",
                )

            # 5. Save video (pre-compute take number to keep filename and store in sync)
            if grouping_filename_args is not None:
                take_num = next_take_number(
                    video_dir=self._paths.video_dir,
                    **grouping_filename_args,
                )
            else:
                take_num = self._next_take_number(shot_id)
            # Phase 6 (M+K): pass unified_payload so _save_video writes
            # the .mp4.json sidecar with refs_used populated.
            video_path = self._save_video(
                shot_id,
                result,
                take_number=take_num,
                model=model,
                unified_payload=unified_payload,
                grouping=grouping,
                grouping_filename_args=grouping_filename_args,
            )

            # 5b. Video frame validation (POST-GEN, blocking on hard failures)
            if self._validate_frames and video_path:
                try:
                    from recoil.core.paths import ensure_pipeline_importable

                    ensure_pipeline_importable()
                    from recoil.pipeline._lib.critics.video_frame_critic import (
                        VideoFrameCritic,
                    )

                    vf_critic = VideoFrameCritic(
                        character_type="human",
                        expected_style="",
                        num_frames=3,
                    )
                    _, vf_result = vf_critic.run(str(video_path))

                    if vf_result.errored:
                        # Vision API down — route to pending_qc, run continues
                        logger.warning(
                            "Video validation ERRORED for %s: %s — routing to pending_qc",
                            shot_id,
                            vf_result.error,
                        )
                        self._store.update_shot(
                            shot_id,
                            status="pending_qc",
                            error_message=f"video_frame_critic ERROR: {vf_result.error}",
                            cost_incurred=cost,
                        )
                        if self._cost_logger:
                            self._cost_logger.record(shot_id, cost, model, "video")
                        return StepResult(
                            shot_id=shot_id,
                            success=False,
                            final_state="pending_qc",
                            output_path=str(video_path),
                            cost_usd=cost,
                            error=f"video_frame_critic ERROR: {vf_result.error}",
                            take_index=take_num,
                            gate_verdict=None,
                            model=model,
                            pipeline="video",
                        )

                    if vf_result.outcome == Outcome.FAIL and vf_result.hard_failures:
                        failed = [
                            f"{d.name}: {d.message}" for d in vf_result.hard_failures
                        ]
                        logger.error(
                            "Video validation HARD FAIL for %s: %s",
                            shot_id,
                            "; ".join(failed),
                        )
                        self._store.update_shot(
                            shot_id,
                            status="video_semantic_failed",
                            error_message=f"Video hard failure: {'; '.join(failed)}",
                            cost_incurred=cost,
                        )
                        if self._cost_logger:
                            self._cost_logger.record(shot_id, cost, model, "video")
                        return StepResult(
                            shot_id=shot_id,
                            success=False,
                            final_state="video_semantic_failed",
                            output_path=str(video_path),
                            cost_usd=cost,
                            error=f"Video hard failure: {'; '.join(failed)}",
                            take_index=take_num,
                            gate_verdict=None,
                            model=model,
                            pipeline="video",
                        )

                    # Soft failures or pass — proceed per PC-1
                    if vf_result.soft_failures:
                        soft = [
                            f"{d.name}: {d.message}" for d in vf_result.soft_failures
                        ]
                        logger.info(
                            "Video soft-failed for %s: %s — proceeding per PC-1",
                            shot_id,
                            "; ".join(soft),
                        )
                        self._store.update_shot(
                            shot_id,
                            validation_notes=f"Video soft failures: {'; '.join(soft)}",
                        )
                    else:
                        logger.info("Video validation passed for %s", shot_id)
                except Exception as e:
                    logger.error(
                        "Video validation infrastructure error for %s: %s", shot_id, e
                    )
                    # Fail safe — go to pending_qc, don't lose the work
                    self._store.update_shot(
                        shot_id,
                        status="pending_qc",
                        error_message=f"Video validation crashed: {e}",
                    )
                    return StepResult(
                        shot_id=shot_id,
                        success=False,
                        final_state="pending_qc",
                        output_path=str(video_path),
                        cost_usd=cost,
                        error=str(e),
                        take_index=take_num,
                        gate_verdict=None,
                        model=model,
                        pipeline="video",
                    )

            # 6. Transition to video_ready
            self._store.update_shot(shot_id, status="video_ready")

            # 7. Run gates (if any)
            gate_verdict = None
            any_deferred = False
            deferred_reasons = []
            if gates:
                shot_data = self._store.get_shot(shot_id) or {}
                for gate_fn in gates:
                    try:
                        verdict = gate_fn(video_path, shot_data)
                    except Exception as gate_err:
                        gate_name = (
                            getattr(gate_fn, "__name__", None) or type(gate_fn).__name__
                        )
                        verdict = GateVerdict(
                            passed=True,
                            deferred=True,
                            gate_name=gate_name,
                            reason=f"gate crashed: {gate_err}",
                        )
                    gate_verdict = verdict
                    if verdict.deferred:
                        any_deferred = True
                        deferred_reasons.append(str(verdict.reason or "deferred"))
                    if not verdict.passed:
                        cost += verdict.cost
                        break
                    cost += verdict.cost

            # 7b. Handle gate failure
            if gate_verdict and not gate_verdict.passed:
                fail_state = (
                    "video_mechanical_failed"
                    if gate_verdict.retriable
                    else "video_semantic_failed"
                )
                self._store.update_shot(shot_id, status=fail_state)
                rel_path = self._relative_path(video_path)
                take_record = {
                    "take_number": take_num,
                    "file_path": rel_path,
                    "prompt_used": prompt,
                    "cost_usd": cost,
                    "timestamp": time.time(),
                    "model": model,
                    "pipeline": "video",
                    "gate_verdict": {
                        "passed": False,
                        "gate_name": gate_verdict.gate_name,
                        "reason": gate_verdict.reason,
                    },
                    "disposition": "rejected",
                    "inputs_snapshot": inputs_snapshot,
                    "parent_take_id": parent_take_id,
                }
                self._store.append_take(shot_id, take_record)
                self._store.update_shot(shot_id, cost_incurred=cost)
                if self._cost_logger:
                    self._cost_logger.record(shot_id, cost, model, "video")
                return StepResult(
                    shot_id=shot_id,
                    success=False,
                    final_state=fail_state,
                    output_path=str(video_path),
                    cost_usd=cost,
                    error=gate_verdict.reason,
                    take_index=take_num,
                    gate_verdict=gate_verdict,
                    model=model,
                    pipeline="video",
                )

            # 8. Transition to video_complete
            self._store.update_shot(shot_id, status="video_complete")

            # 9. Build take record and append
            rel_path = self._relative_path(video_path)
            take_record = {
                "take_number": take_num,
                "file_path": rel_path,
                "prompt_used": prompt,
                "cost_usd": cost,
                "timestamp": time.time(),
                "model": model,
                "pipeline": "video",
                "gate_verdict": {
                    "passed": gate_verdict.passed,
                    "gate_name": gate_verdict.gate_name,
                    "reason": gate_verdict.reason,
                }
                if gate_verdict
                else None,
                "disposition": None,  # Set by approve/reject later
                "inputs_snapshot": inputs_snapshot,
                "parent_take_id": parent_take_id,
            }
            take_idx = self._store.append_take(shot_id, take_record)

            # Update output_path and cost
            update_fields = {
                "output_path": rel_path,
                "cost_incurred": cost,
                "gate_results": {"video_path": rel_path},
            }
            if any_deferred:
                update_fields["deferred"] = True
                update_fields["deferred_reason"] = "; ".join(deferred_reasons)
            self._store.update_shot(shot_id, **update_fields)

            # 10. Log cost
            if self._cost_logger:
                self._cost_logger.record(shot_id, cost, model, "video")

            logger.info("Shot %s: video complete → %s ($%.3f)", shot_id, rel_path, cost)

            # R6 Phase 4 — universal sidecar via populate_sidecar SSOT.
            # Replaces write_pipeline_sidecar_RETIRED. Refs aggregation preserved
            # verbatim from the prior block (reference_images + start_frame
            # de-duped). Receipt=None because no GenerationReceipt is in scope
            # at this call site; cost+seed propagate via payload (Opus R3 Q3).
            try:
                from recoil.pipeline._lib.sidecar import (
                    populate_sidecar,
                    write_sidecar_dict,
                )

                # Phase 6.5 leak fix (preserved) — refs from dispatch payload.
                _refs_for_sidecar: list[str] = []
                if reference_images:
                    _refs_for_sidecar.extend(str(r) for r in reference_images)
                if start_frame is not None:
                    _sf = str(start_frame)
                    if _sf not in _refs_for_sidecar:
                        _refs_for_sidecar.append(_sf)

                _seed = None
                _provider = None
                if result is not None and getattr(result, "metadata", None):
                    _seed = (result.metadata or {}).get("seed")
                    _provider = (result.metadata or {}).get("provider")

                _sidecar_payload = {
                    "video_path": str(video_path),
                    "model": model,
                    "modality": "video_i2v" if start_frame is not None else "video_t2v",
                    "prompt": prompt,
                    "duration": duration,
                    "duration_s": duration,
                    "cost_usd": cost,
                    "seed": _seed,
                }
                if isinstance(grouping, dict):
                    _sidecar_payload["grouping"] = dict(grouping)
                _gate_results_dict = (
                    {
                        g.gate_name: ("pass" if g.passed else "fail")
                        for g in [gate_verdict]
                        if g
                    }
                    if gate_verdict
                    else {}
                )
                _inputs_snapshot_hash = (
                    inputs_snapshot.get("inputs_snapshot_hash")
                    if isinstance(inputs_snapshot, dict)
                    else None
                )
                _location_id = (
                    inputs_snapshot.get("location_id")
                    if isinstance(inputs_snapshot, dict)
                    else None
                )

                _sidecar_dict = populate_sidecar(
                    receipt=None,
                    payload=_sidecar_payload,
                    refs_used=_refs_for_sidecar,
                    gate_results=_gate_results_dict,
                    pipeline="video_i2v" if start_frame is not None else "video_t2v",
                    dispatch_path=self._dispatch_path or "unknown",
                    shot_id=shot_id,
                    generation_params={
                        "duration": duration,
                        "aspect_ratio": aspect_ratio,
                        "mode": "image2video"
                        if start_frame is not None
                        else "text2video",
                    },
                    inputs_snapshot_hash=_inputs_snapshot_hash,
                    location_id=_location_id,
                    provider_adapter=_provider,
                )
                _sc_path = video_path.with_suffix(video_path.suffix + ".json")
                write_sidecar_dict(_sc_path, _sidecar_dict)
            except Exception as e:
                logger.warning("Sidecar write failed for %s: %s", shot_id, e)

            # Write YAML generation metadata sidecar for SeedDance
            if "seeddance" in model:
                try:
                    _write_seeddance_meta_sidecar(
                        video_path=video_path,
                        shot_id=shot_id,
                        model=model,
                        prompt=prompt,
                        duration=duration,
                        aspect_ratio=aspect_ratio,
                        video_mode="image2video"
                        if start_frame is not None
                        else "text2video",
                        cost_usd=cost,
                        latency_seconds=time.time() - _gen_start_time,
                        start_frame=start_frame,
                        reference_images=reference_images,
                    )
                except Exception as e:
                    logger.warning(
                        "YAML meta sidecar write failed for %s: %s", shot_id, e
                    )

            vid_take_id = _synthesize_take_id(take_record, shot_id, take_idx)

            return StepResult(
                shot_id=shot_id,
                success=True,
                final_state="video_complete",
                output_path=rel_path,
                cost_usd=cost,
                error=None,
                take_index=take_idx,
                gate_verdict=gate_verdict,
                model=model,
                pipeline="video",
                take_id=vid_take_id,
            )

        except Exception as e:
            if isinstance(e, RuntimeError) and str(e) == _FFMPEG_UNAVAILABLE_MESSAGE:
                raise
            # Catch-all: try to set status to video_failed
            logger.error("Shot %s: execute_video failed: %s", shot_id, e)
            try:
                self._store.update_shot(
                    shot_id,
                    status="video_failed",
                    error_message=str(e),
                    cost_incurred=cost,
                )
            except Exception as _store_err:
                fire_sanctioned_fallback(
                    "step_runner_outer_failure_store_update_skip",
                    scope="step_runner.execute_video",
                    error=str(_store_err),
                )
            return StepResult(
                shot_id=shot_id,
                success=False,
                final_state="video_failed",
                output_path=None,
                cost_usd=cost,
                error=str(e),
                take_index=-1,
                gate_verdict=None,
                model=model,
                pipeline="video",
            )

    def execute_multi_shot(
        self,
        batch: list[dict],
        multi_prompt_sequence: list[dict],
        model: str = "kling-v3",
        start_frame: Optional[Path] = None,
        aspect_ratio: str = "9:16",
        elements_payload: Optional[dict] = None,
        cfg_scale: Optional[float] = None,
        shot_prompts: Optional[list[dict]] = None,
    ) -> list[StepResult]:
        """Execute a multi-prompt batch generation. Blocks until complete.

        Submits one multi-prompt API call, waits for completion, slices
        the output video into individual clips, and updates the execution
        store for each shot.

        Args:
            batch: List of plan shot dicts (with _api_duration, shot_id).
            multi_prompt_sequence: List of {index, prompt, duration} from
                build_multi_prompt_sequence.
            model: Model ID (default: kling-v3).
            start_frame: Optional path to start frame image for I2V.
            aspect_ratio: Video aspect ratio.

        Returns:
            List of StepResults, one per shot in the batch.
        """
        shot_ids = [s.get("shot_id", f"unknown_{i}") for i, s in enumerate(batch)]
        shot_durations = [s.get("_api_duration", 5) for s in batch]
        total_duration = sum(shot_durations)
        results: list[StepResult] = []

        # Check if this model supports native multi-shot (MultiShotPayload).
        # CP-2 Phase 8/R4: "fal_ai_kling" removed — KlingAdapter doesn't accept
        # MultiShotPayload (drops shots/shot_ids/start_frame_bytes via
        # _dict_to_unified). Phase-8-stubbed get_client routes through
        # VideoModelClient which only accepts UnifiedVideoPayload-shaped dicts.
        # The fall-through `_execute_sequential_shots` path is the working route
        # for fal-ai-hosted Kling. CP-3 will revisit native multi-shot via
        # KlingAdapter once UnifiedVideoPayload.hints["multi_shots"] is wired.
        api_pattern = model_profiles.get_api_pattern(model)
        _MULTI_SHOT_PATTERNS = {"kling_rest"}

        if api_pattern not in _MULTI_SHOT_PATTERNS:
            # Sequential single-shot fallback for models that don't support
            # multi-prompt sequences (Veo 3.1, SeedDance, etc.)
            return self._execute_sequential_shots(
                batch=batch,
                multi_prompt_sequence=multi_prompt_sequence,
                model=model,
                start_frame=start_frame,
                aspect_ratio=aspect_ratio,
                elements_payload=elements_payload,
                cfg_scale=cfg_scale,
            )

        # NATIVE MULTI-SHOT (kling_rest) — UNWIRED, fail loud instead of
        # silently dropping shots.
        #
        # The former native branch built a MultiShotPayload and submitted it
        # through get_client(model) -> VideoModelClient, which only accepts
        # UnifiedVideoPayload-shaped dicts and drops shots/shot_ids/
        # start_frame_bytes via _dict_to_unified. The call "succeeded" while
        # silently losing every shot. Raise here so a caller never loses work.
        #
        # FUTURE NATIVE WIRE-UP GOES HERE: re-enable native multi-shot once
        # UnifiedVideoPayload.hints["multi_shots"] is wired through
        # KlingAdapter (CP-3). The get_api_pattern / _MULTI_SHOT_PATTERNS
        # routing seam above is preserved as the entry point. See divergence
        # multi_shot_native_vs_sequential.
        raise NotImplementedError(
            "Kling native multi-shot dispatch is not wired "
            f"(api_pattern={api_pattern!r}); MultiShotPayload would be "
            "silently dropped by VideoModelClient (shots/start_frame_bytes "
            "lost). Use the sequential path (r2v_multi) instead. Re-enable "
            "when hints['multi_shots'] is wired through KlingAdapter — see "
            "divergence multi_shot_native_vs_sequential."
        )

    def execute_multi_shot_with_takes(
        self,
        batch: list[dict],
        multi_prompt_sequence: list[dict],
        takes_count: int = 1,
        budget_remaining: float | None = None,
        **kwargs,
    ) -> list[list[StepResult]]:
        """Execute multiple takes of the same multi-shot pass.

        Calls execute_multi_shot N times with the same parameters.
        Each take produces a separate video. Stops early if budget
        would be exceeded.

        Args:
            batch: List of plan shot dicts.
            multi_prompt_sequence: List of {index, prompt, duration}.
            takes_count: Number of takes to generate (default 1).
            budget_remaining: Optional budget cap. If the next take
                would exceed this, stop generating.
            **kwargs: Passed through to execute_multi_shot (model,
                start_frame, aspect_ratio, elements_payload, cfg_scale).

        Returns:
            List of lists — one inner list per take, each containing
            StepResults from execute_multi_shot.
        """
        takes_count = max(1, min(takes_count, 5))  # Hard cap at 5
        all_takes: list[list[StepResult]] = []

        # Estimate cost per take
        model = kwargs.get("model", "kling-v3")
        cost_per_second = model_profiles.get_cost(model)
        total_duration = sum(
            s.get("duration", s.get("_api_duration", 5)) for s in multi_prompt_sequence
        )
        estimated_cost = cost_per_second * total_duration

        for take_num in range(takes_count):
            # Budget guard
            if budget_remaining is not None:
                if estimated_cost > budget_remaining:
                    logger.info(
                        "Budget cap reached after %d/%d takes (remaining: $%.2f, next take: $%.2f)",
                        take_num,
                        takes_count,
                        budget_remaining,
                        estimated_cost,
                    )
                    break

            results = self.execute_multi_shot(
                batch=batch,
                multi_prompt_sequence=multi_prompt_sequence,
                **kwargs,
            )
            all_takes.append(results)

            # Deduct actual cost from budget
            actual_cost = sum(r.cost_usd for r in results)
            if budget_remaining is not None:
                budget_remaining -= actual_cost

        return all_takes

    def _execute_sequential_shots(
        self,
        batch: list[dict],
        multi_prompt_sequence: list[dict],
        model: str,
        start_frame: Optional[Path] = None,
        aspect_ratio: str = "9:16",
        elements_payload: Optional[dict] = None,
        cfg_scale: Optional[float] = None,
    ) -> list[StepResult]:
        """Sequential single-shot fallback for models without multi-prompt support.

        Submits each shot individually as a dict payload. Used for Veo 3.1
        and other models that don't accept MultiShotPayload.
        """
        shot_ids = [s.get("shot_id", f"unknown_{i}") for i, s in enumerate(batch)]
        results: list[StepResult] = []
        cost_per_second = model_profiles.get_cost(model)
        client = get_client(model)
        total_cost = 0.0

        for i, (shot, seq_entry) in enumerate(zip(batch, multi_prompt_sequence)):
            sid = shot_ids[i]
            duration = shot.get("_api_duration", 5)
            prompt = seq_entry.get("prompt", "")
            shot_cost = cost_per_second * duration

            try:
                self._store.update_shot(sid, status="video_submitted")
            except Exception as _store_err:
                fire_sanctioned_fallback(
                    "step_runner_batch_update_skip",
                    scope="step_runner.execute_sequence_sequential.submitted",
                    error=str(_store_err),
                )

            # Build single-shot dict payload
            payload = {
                "model": model,
                "prompt": prompt,
                "duration": duration,
                "aspect_ratio": aspect_ratio,
            }

            # First shot gets start frame if provided
            if i == 0 and start_frame and start_frame.exists():
                payload["start_frame"] = str(start_frame)

            try:
                job = client.submit(payload)
                gen_result = client.wait_for_job(job, timeout_s=720)
            except Exception as e:
                try:
                    self._store.update_shot(
                        sid,
                        status="video_failed",
                        error_message=f"Sequential shot failed: {e}",
                    )
                except Exception as _store_err:
                    fire_sanctioned_fallback(
                        "step_runner_nested_cleanup_skip",
                        scope="step_runner.execute_sequence_sequential.shot_failed",
                        error=str(_store_err),
                    )
                results.append(
                    StepResult(
                        shot_id=sid,
                        success=False,
                        final_state="video_failed",
                        output_path=None,
                        cost_usd=0.0,
                        error=str(e),
                        take_index=-1,
                        gate_verdict=None,
                        model=model,
                        pipeline="sequence_sequential",
                    )
                )
                continue

            if not gen_result.success:
                try:
                    self._store.update_shot(
                        sid,
                        status="video_failed",
                        error_message=gen_result.error or "Generation failed",
                        cost_incurred=shot_cost,
                    )
                except Exception as _store_err:
                    fire_sanctioned_fallback(
                        "step_runner_nested_cleanup_skip",
                        scope="step_runner.execute_sequence_sequential.gen_failed",
                        error=str(_store_err),
                    )
                total_cost += shot_cost
                results.append(
                    StepResult(
                        shot_id=sid,
                        success=False,
                        final_state="video_failed",
                        output_path=None,
                        cost_usd=shot_cost,
                        error=gen_result.error,
                        take_index=-1,
                        gate_verdict=None,
                        model=model,
                        pipeline="sequence_sequential",
                    )
                )
                continue

            # Save video and register take (pre-compute take number for sync)
            take_num = self._next_take_number(sid)
            video_path = self._save_video(
                sid, gen_result, take_number=take_num, model=model
            )
            total_cost += shot_cost

            try:
                self._store.update_shot(sid, status="video_complete")
                rel_path = self._relative_path(video_path)
                take_record = {
                    "take_number": take_num,
                    "file_path": rel_path,
                    "prompt_used": prompt[:200],
                    "cost_usd": shot_cost,
                    "timestamp": time.time(),
                    "model": model,
                    "pipeline": "sequence_sequential",
                    "total_duration": duration,
                    "disposition": None,
                }
                self._store.append_take(sid, take_record)
                self._store.update_shot(sid, cost_incurred=shot_cost)
            except Exception as e:
                logger.error("Failed to update store for %s: %s", sid, e)

            if self._cost_logger:
                self._cost_logger.record(sid, shot_cost, model, "sequence_sequential")

            success = video_path.exists()
            results.append(
                StepResult(
                    shot_id=sid,
                    success=success,
                    final_state="video_complete" if success else "video_failed",
                    output_path=str(video_path) if success else None,
                    cost_usd=shot_cost,
                    error=None if success else "Video save failed",
                    take_index=-1,
                    gate_verdict=None,
                    model=model,
                    pipeline="sequence_sequential",
                )
            )

            logger.info(
                "Sequential shot %d/%d (%s): %s — $%.3f",
                i + 1,
                len(batch),
                sid,
                "OK" if success else "FAIL",
                shot_cost,
            )

        logger.info(
            "Sequential batch complete: %d/%d shots succeeded, $%.3f total",
            sum(1 for r in results if r.success),
            len(results),
            total_cost,
        )
        return results

    def _resolve_look_bundle(self, target_model: str, inputs_snapshot: dict | None):
        """Resolve the project's bound Look + Identities into a LookBundle.

        krea2-flora Phase 2. Reads the project's ``project_config.json`` (via
        ``self._paths.project_config_path``) for the ``look`` / ``identities``
        keys, resolves them through the Phase-1 resolver, and builds a
        budgeted ``LookBundle`` for ``target_model``.

        Returns ``None`` (a complete no-op for the caller) whenever:
          * there is no ProjectPaths / project_config.json on disk, OR
          * the project has no ``look`` binding (the case for EVERY project
            until Phase 3 wires real bindings).

        ``project_config`` is NOT plumbed through ``execute_keyframe``'s
        signature, so this method loads it from disk and wraps it in a tiny
        ``{"project_config": <cfg>}`` shim — exactly the shape the Phase-1
        ``resolve_look`` / ``resolve_identity`` accept (they read either a
        ``.project_config`` attr OR a ``project_config`` dict key).
        """
        import json as _json

        from recoil.pipeline._lib.look_loader import (
            build_look_bundle,
            resolve_identity,
            resolve_look,
            validate_project_binding,
        )

        # Load project_config.json from the project paths (best-effort).
        cfg: dict = {}
        paths = getattr(self, "_paths", None)
        cfg_path = getattr(paths, "project_config_path", None) if paths else None
        # StepRunner's _paths is the EPISODE-SCOPED step_types.ProjectPaths,
        # which exposes `project_root` but NOT the `project_config_path`
        # property (only core.paths.ProjectPaths has it). Without this fallback
        # the look would NEVER resolve on the real render path — the feature
        # would be silently dead. Derive the canonical location
        # (project_root/project_config.json) when the attr is absent.
        if cfg_path is None and paths is not None:
            project_root = getattr(paths, "project_root", None)
            if project_root is not None:
                cfg_path = Path(project_root) / "project_config.json"
        if cfg_path is not None and Path(cfg_path).is_file():
            try:
                cfg = _json.loads(Path(cfg_path).read_text(encoding="utf-8"))
            except (OSError, ValueError):
                cfg = {}

        # krea2-flora Phase 3 — enforce look XOR cinema_mode at the dispatch
        # path too (step_runner reads the config raw, bypassing
        # load_project_config). A contradictory binding must fail loud here
        # rather than silently apply a look on top of a project cinema_mode.
        validate_project_binding(cfg)

        ctx = {"project_config": cfg}

        # No look binding → complete no-op (back-compat for all current projects).
        look = resolve_look(ctx)
        if look is None:
            return None

        # Resolve an Identity per character key carried in inputs_snapshot.
        identities: list[dict] = []
        seen: set = set()
        chars = (inputs_snapshot or {}).get("characters") or []
        for char in chars:
            key = (
                char.get("char_id") or char.get("display_name")
                if isinstance(char, dict)
                else char
            )
            if not key:
                continue
            ident = resolve_identity(ctx, key)
            if ident is not None:
                ident_id = ident.get("identity_id")
                if ident_id not in seen:
                    seen.add(ident_id)
                    identities.append(ident)

        return build_look_bundle(look, identities, target_model)

    def execute_keyframe(
        self,
        shot_id: str,
        prompt: str,
        model: str,
        scene_ref_path: Optional[Path] = None,
        pose_ref_path: Optional[Path] = None,
        identity_refs: Optional[list[Path]] = None,
        expression_refs: Optional[list[Path]] = None,
        aspect_ratio: str = "9:16",
        gates: Optional[list[GateFunction]] = None,
        max_gate_retries: int = 3,
        inputs_snapshot: dict | None = None,
        reference_images: Optional[list[Path]] = None,
        quality: Optional[str] = None,
        size_override: Optional[str] = None,
        source_frame: Optional[Path] = None,
        save_dir: Optional[Path | str] = None,
        filename_stem: Optional[str] = None,
        sidecar_extra: Optional[dict] = None,
    ) -> StepResult:
        """Execute a keyframe generation step with gate retry loop.

        krea2-flora Phase 4 — reference-conditioned edit pass:
        When ``source_frame`` is provided, it flows into the
        UnifiedVideoPayload's ``image`` field as the i2i/is2i base
        (start) frame. The Flora adapter's ``_infer_action`` then
        promotes the image action from ``t2i`` to ``i2i``/``is2i``
        (mirroring the video t2v→i2v/r2v promotion) and emits the
        image ref field. Combined with the resolved LookBundle's
        identity/style refs (already injected via ``reference_images``),
        this re-renders an EXISTING frame through the bound Look/Identity
        — serving both drift-repair and previz-restyle on the same path.
        OPTIONAL: when ``source_frame`` is None (default), ``image`` stays
        None and behavior is byte-identical to a normal t2i keyframe.

        Lifecycle:
        1. Transition to keyframe_generating
        2. Generate image via API
        3. Save keyframe
        4. Run gates — Gate 1 (mechanical) is retriable, Gate 2 (semantic) is not
        5. On all gates pass: keyframe_generated, append take
        6. On gate fail: keyframe_mechanical_failed or keyframe_semantic_failed
        """
        if (save_dir is None) != (filename_stem is None):
            raise ValueError("save_dir and filename_stem must be set together")
        save_dir = Path(save_dir) if save_dir is not None else None

        total_cost = 0.0
        attempts = 0
        feedback_agent = None  # Lazy-init only when gates fail

        try:
            # 1. Transition to keyframe_generating
            self._store.update_shot(shot_id, status="keyframe_generating")

            from recoil.execution.assembler import (
                KeyframeRefBundle,
                compile_keyframe_parts,
            )

            # CP-2 Phase 6 — provider-aware dispatch via GoogleAdapter.
            # The legacy get_client factory in api_client lives on until
            # Phase 8 (the deletion gate); execute_keyframe no longer calls it.
            from recoil.execution.providers import resolve_adapter, UnifiedVideoPayload

            # Build intention context for validation gates
            intention_ctx = {
                "generation_prompt": prompt,
                "character_anchor": "",
                "scene_description": "",
                "narrative_context": "",
            }
            if inputs_snapshot:
                anchor = inputs_snapshot.get("identity_anchor", "")
                if anchor:
                    intention_ctx["character_anchor"] = anchor

            # ── krea2-flora Phase 2: Look/Identity injection ──────────────
            # Resolve the project's bound Look/Identity, build a LookBundle for
            # the target model, and inject its refs + aspect into the keyframe
            # payload. BACK-COMPAT: when the project has NO `look` binding
            # (every project until Phase 3 wires real bindings), `resolve_look`
            # returns None and this entire block is a complete no-op — behavior
            # is byte-identical to before. The whole block is guarded on
            # `look is not None`.
            from recoil.pipeline._lib.look_loader import ProjectBindingError

            look_bundle = None
            try:
                look_bundle = self._resolve_look_bundle(model, inputs_snapshot)
            except ProjectBindingError:
                # A contradictory binding (look XOR cinema_mode violated) is an
                # operator config error — fail LOUD here rather than silently
                # rendering with the wrong/no look. This is the ONLY XOR
                # enforcement on the render path (it reads project_config raw,
                # not via load_project_config), so it must not be swallowed.
                raise
            except Exception as e:  # other look-resolution hiccups never break dispatch
                logger.warning("execute_keyframe look resolution skipped: %s", e)
                look_bundle = None

            if look_bundle is not None:
                from recoil.pipeline._lib.look_loader import REF_ROOT

                # Identity refs FIRST, then style refs; resolve each
                # CONFIG_DIR-relative ResolvedRef.path to an absolute path so
                # the Flora adapter's _upload_local_refs can upload it.
                bundle_ref_paths = [
                    str((REF_ROOT / r.path).resolve())
                    for r in (look_bundle.identity_refs + look_bundle.style_refs)
                ]
                if bundle_ref_paths:
                    # Merge bundle refs (identity-first) ahead of caller refs,
                    # de-duplicating by resolved absolute path so a ref present
                    # in BOTH the Look and the caller args isn't uploaded/sent
                    # twice (cost + unpredictable model effect). Order-preserving
                    # via dict.fromkeys; bundle refs keep priority.
                    caller_paths = [
                        str(Path(p).resolve()) for p in (reference_images or [])
                    ]
                    deduped = list(dict.fromkeys(bundle_ref_paths + caller_paths))
                    reference_images = [Path(p) for p in deduped]
                # Aspect from the look (look's aspect_default wins when bound).
                if look_bundle.aspect:
                    aspect_ratio = look_bundle.aspect
                # Inject the TEXTUAL half of the Look into the prompt. The image
                # builders expose an apply_look hook, but they are compiled
                # UPSTREAM without the bundle (the bundle is only resolved here,
                # at dispatch time), so the textual fragments would never reach
                # the model. Apply them here where the bundle is live. avoid[]
                # rides as positive prose — never a negative_prompt. Re-enforce
                # length afterward (best-effort; the base prompt was already
                # length-checked by its builder).
                from recoil.pipeline._lib.prompt_engine import (
                    _enforce_prompt_length,
                    apply_look,
                )

                prompt = apply_look(prompt, look_bundle)
                prompt = _enforce_prompt_length(prompt, model)
                # Stash on the runner for sidecar / downstream consumers.
                self._look_bundle = look_bundle
                # Tag the cost logger / provenance with look_id + ref_budget.
                _look_prov = look_bundle.provenance.get("look_id")
                logger.info(
                    "execute_keyframe: LookBundle injected — look_id=%s ref_budget=%s",
                    _look_prov,
                    look_bundle.ref_budget,
                )
                if self._cost_logger and hasattr(self._cost_logger, "tag"):
                    try:
                        self._cost_logger.tag(
                            shot_id,
                            look_id=_look_prov,
                            ref_budget=look_bundle.ref_budget,
                        )
                    except Exception:  # tagging is best-effort provenance only
                        pass

            # Gate retry loop
            for attempt in range(max_gate_retries + 1):
                attempts += 1

                # Build model-agnostic ref bundle
                bundle = KeyframeRefBundle(
                    prompt=prompt,
                    model=model,
                    aspect_ratio=aspect_ratio,
                    scene_ref=scene_ref_path,
                    pose_ref=pose_ref_path,
                    identity_refs=identity_refs or [],
                    expression_refs=expression_refs or [],
                    inputs_snapshot=inputs_snapshot,
                )

                # Translate the bundle into a UnifiedVideoPayload with
                # hints["modality"]="image" and hints["parts"] = the assembler-built
                # parts list. genai_config matches GoogleGenaiClient.generate_keyframe
                # (api_client.py:602) so behavior is byte-identical at the API call.
                parts = compile_keyframe_parts(bundle)
                # Phase C: typed StepRunnerHints. genai_config rides via
                # extra="allow" (it's a Google-specific key that lives on
                # GoogleHints; StepRunnerHints accepts it as extra).
                # Flat reference_images list — for fal-routed multi-ref image
                # models (gpt-image-2) whose adapter consumes
                # payload.reference_images directly, not via the Google parts
                # list. Wired 2026-05-25 to fix the silent ref drop in the
                # image_t2i dispatch chain: prior to this, the only ref path
                # reaching the model was via KeyframeRefBundle → compile_
                # keyframe_parts → hints.parts (Google-shape only). Multi-ref
                # gpt-image-2 i2i calls were arriving at fal as pure t2i.
                _flat_refs = (
                    [str(p) for p in reference_images] if reference_images else None
                )
                # Forward quality through hints; the fal adapter's
                # _resolve_gpt_image_2_quality reads it via the payload.
                _hints_kwargs: dict = {
                    "modality": "image",
                    "parts": parts,
                    "genai_config": {
                        "response_modalities": ["IMAGE", "TEXT"],
                        "image_config": {"aspect_ratio": aspect_ratio},
                    },
                }
                if quality:
                    _hints_kwargs["quality"] = quality
                # krea2-flora Phase 5: the Krea `creativity` dial + `seed` ride
                # via hints so the Flora image adapter (_build_params) emits them
                # into params. Source precedence: explicit per-call inputs_snapshot
                # (the `concept` CLI passes these) over the bound Look's default
                # `creativity`. StepRunnerHints is extra="allow", so these pass
                # through coerce_to_dict() into the Flora params untyped. Without
                # this, the concept creativity dial + look creativity were
                # silently dropped (passed but never forwarded to the payload).
                _creativity = None
                if inputs_snapshot and inputs_snapshot.get("creativity"):
                    _creativity = inputs_snapshot["creativity"]
                elif look_bundle is not None and getattr(
                    look_bundle, "creativity", None
                ):
                    _creativity = look_bundle.creativity
                if _creativity:
                    _hints_kwargs["creativity"] = _creativity
                if inputs_snapshot and inputs_snapshot.get("seed") is not None:
                    _hints_kwargs["seed"] = inputs_snapshot["seed"]
                # gpt-image-2 size_override (2K/4K) rides via hints so the
                # adapter's _resolve_gpt_image_2_size_with_hints picks it up.
                # The validator in the adapter rejects out-of-spec sizes
                # (>3840 edge, >8.29 MP, non-16-multiples, AR>3:1) by falling
                # back to the aspect-ratio preset map with a WARNING — callers
                # don't have to pre-validate.
                if size_override:
                    _hints_kwargs["size_override"] = size_override
                # krea2-flora Phase 4: the edit-pass base frame. When set,
                # this is the EXISTING frame being re-rendered; it rides as
                # payload.image (the i2i/is2i start/base frame) so the Flora
                # adapter promotes t2i→i2i/is2i and emits the image ref field.
                # When None (the t2i default), payload.image stays None and
                # _infer_action returns t2i exactly as before.
                _source_frame_val = str(source_frame) if source_frame else None
                unified = UnifiedVideoPayload(
                    prompt=prompt,
                    aspect_ratio=aspect_ratio,
                    duration_s=0,
                    resolution="720p",
                    shot_id=shot_id,
                    model_id=model,
                    image=_source_frame_val,
                    reference_images=_flat_refs,
                    hints=StepRunnerHints(**_hints_kwargs),
                )

                # Resolve adapter + dispatch via direct_submit_image (synchronous bypass).
                # Google's image API doesn't fit the submit/poll shape so we skip
                # VideoModelClient and call the adapter directly per CP-2 spec.
                try:
                    adapter, _tier = resolve_adapter(model, unified)
                    raw = adapter.direct_submit_image(unified)
                    image_bytes = raw.get("image_bytes")
                    if image_bytes:
                        result = GenerationResult(
                            success=True,
                            image_data=image_bytes,
                            model=model,
                            cost=raw.get("cost_usd") or model_profiles.get_cost(model),
                        )
                    else:
                        result = GenerationResult(
                            success=False,
                            model=model,
                            error="No image_bytes in adapter result",
                        )
                except Exception as e:
                    logger.error("execute_keyframe adapter dispatch failed: %s", e)
                    result = GenerationResult(
                        success=False,
                        model=model,
                        error=str(e),
                    )
                attempt_cost = result.cost or 0.0
                total_cost += attempt_cost

                # 3. Handle API failure
                if not result.success:
                    self._store.update_shot(
                        shot_id,
                        status="keyframe_mechanical_failed",
                        error_message=result.error or "Keyframe generation failed",
                        cost_incurred=total_cost,
                    )
                    if self._cost_logger:
                        self._cost_logger.record(shot_id, total_cost, model, "still")
                    return StepResult(
                        shot_id=shot_id,
                        success=False,
                        final_state="keyframe_mechanical_failed",
                        output_path=None,
                        cost_usd=total_cost,
                        error=result.error,
                        take_index=-1,
                        gate_verdict=None,
                        model=model,
                        pipeline="still",
                    )

                # 4. Save keyframe (with take number so retries don't overwrite)
                take_num = (
                    1 if save_dir is not None else self._next_take_number(shot_id)
                )
                keyframe_path = self._save_keyframe(
                    shot_id,
                    result,
                    take_number=take_num,
                    save_dir=save_dir,
                    filename_stem=filename_stem,
                )

                # 5. Run gates
                gate_verdict = None
                gate_failed = False
                any_deferred = False
                deferred_reasons = []
                if gates:
                    shot_data = self._store.get_shot(shot_id) or {}
                    for gate_fn in gates:
                        verdict = gate_fn(keyframe_path, shot_data)
                        gate_verdict = verdict
                        attempt_cost += verdict.cost
                        total_cost += verdict.cost
                        if verdict.deferred:
                            any_deferred = True
                            deferred_reasons.append(str(verdict.reason or "deferred"))
                        if not verdict.passed:
                            gate_failed = True
                            if verdict.retriable and attempt < max_gate_retries:
                                # Keep failed frame for review, retry with next take number
                                logger.info(
                                    "Shot %s: gate %s failed (retriable), attempt %d/%d — kept %s",
                                    shot_id,
                                    verdict.gate_name,
                                    attempt + 1,
                                    max_gate_retries,
                                    keyframe_path.name,
                                )
                                take_record = {
                                    "take_number": take_num,
                                    "file_path": self._relative_path(keyframe_path),
                                    "prompt_used": prompt,
                                    "cost_usd": attempt_cost,
                                    "timestamp": time.time(),
                                    "model": model,
                                    "pipeline": "still",
                                    "gate_verdict": {
                                        "passed": False,
                                        "gate_name": verdict.gate_name,
                                        "reason": verdict.reason,
                                    },
                                    "disposition": "rejected",
                                    "inputs_snapshot": inputs_snapshot,
                                }
                                self._store.append_take(shot_id, take_record)
                                break  # Break inner gate loop, continue outer retry loop
                            else:
                                # Non-retriable or max retries reached
                                fail_state = (
                                    "keyframe_semantic_failed"
                                    if "semantic" in verdict.gate_name
                                    or not verdict.retriable
                                    else "keyframe_mechanical_failed"
                                )
                                self._store.update_shot(
                                    shot_id,
                                    status=fail_state,
                                    error_message=verdict.reason,
                                    cost_incurred=total_cost,
                                )
                                if self._cost_logger:
                                    self._cost_logger.record(
                                        shot_id, total_cost, model, "still"
                                    )
                                take_record = {
                                    "take_number": take_num,
                                    "file_path": self._relative_path(keyframe_path),
                                    "prompt_used": prompt,
                                    "cost_usd": attempt_cost,
                                    "timestamp": time.time(),
                                    "model": model,
                                    "pipeline": "still",
                                    "gate_verdict": {
                                        "passed": False,
                                        "gate_name": verdict.gate_name,
                                        "reason": verdict.reason,
                                    },
                                    "disposition": "rejected",
                                    "inputs_snapshot": inputs_snapshot,
                                }
                                take_idx = self._store.append_take(shot_id, take_record)
                                return StepResult(
                                    shot_id=shot_id,
                                    success=False,
                                    final_state=fail_state,
                                    output_path=self._relative_path(keyframe_path),
                                    cost_usd=total_cost,
                                    error=verdict.reason,
                                    take_index=take_idx,
                                    gate_verdict=gate_verdict,
                                    model=model,
                                    pipeline="still",
                                )
                            break  # Only process up to first failed gate

                if gate_failed:
                    # --- Feedback-assisted retry ---
                    if feedback_agent is None:
                        feedback_agent = FeedbackAgent(
                            project_id=self._store.project
                            if hasattr(self._store, "project")
                            else "unknown",
                            engine_memory_path=self._paths.project_root.parent.parent
                            / "recoil"
                            / "engine-memory"
                            if hasattr(self, "_paths")
                            else None,
                        )
                    fix = feedback_agent.diagnose(
                        verdict=gate_verdict,
                        prompt_sections=[],
                        current_refs=identity_refs or [],
                        ref_metadata={},
                        failed_output_path=keyframe_path,
                        attempt_number=attempt + 1,
                        modality="keyframe",
                        target_model=model,
                    )

                    if fix:
                        # Apply negative prompt additions to the prompt text
                        if fix.negative_prompt_additions:
                            neg_text = ", ".join(fix.negative_prompt_additions)
                            prompt = prompt + f"\n\n[NEGATIVE: {neg_text}]"
                            logger.info("Feedback: appended negative prompt guidance")
                        # Apply ref changes
                        if fix.ref_changes and fix.ref_changes.has_removals:
                            if fix.ref_changes.keep_only and identity_refs:
                                identity_refs = identity_refs[:1]  # Keep hero ref only
                                logger.info("Feedback: pruned refs to hero only")
                        # Log the attempt
                        feedback_agent.log_attempt(
                            shot_id,
                            FeedbackAttempt(
                                attempt_number=attempt + 1,
                                strategy=fix.strategy,
                                gate_failed=gate_verdict.gate_name,
                                failure_reason=gate_verdict.reason[:200],
                                ref_changes_applied=str(fix.ref_changes)
                                if fix.ref_changes
                                else None,
                                result="pending",
                                output_path=str(keyframe_path),
                                cost=fix.diagnosis_cost,
                            ),
                        )
                        logger.info(
                            "Feedback: applying %s (confidence %.2f) for attempt %d",
                            fix.strategy.value,
                            fix.confidence,
                            attempt + 2,
                        )
                    else:
                        # No fix available — generate autopsy and ESCALATE immediately.
                        # Continuing the retry loop here would re-run the same broken
                        # prompt+refs, burning budget. (Audit finding H, 2026-04-09.)
                        logger.info(
                            "Feedback: no fix available for %s. Escalating to ICU.",
                            shot_id,
                        )
                        autopsy = feedback_agent.generate_autopsy(
                            {"shot_id": shot_id},
                            gate_verdict,
                        )
                        # Save autopsy to project ICU directory
                        try:
                            from recoil.execution.feedback.autopsy import (
                                save_autopsy_to_project,
                            )

                            save_autopsy_to_project(autopsy, self._paths.project_root)
                        except Exception as e:
                            logger.warning("Could not save autopsy: %s", e)

                        self._store.update_shot(
                            shot_id,
                            status="icu_escalated",
                            error_message="No feedback fix available",
                            cost_incurred=total_cost,
                        )
                        if self._cost_logger:
                            self._cost_logger.record(
                                shot_id, total_cost, model, "still"
                            )
                        return StepResult(
                            shot_id=shot_id,
                            success=False,
                            final_state="icu_escalated",
                            output_path=self._relative_path(keyframe_path),
                            cost_usd=total_cost,
                            error="No feedback fix available — see ICU autopsy",
                            take_index=take_num,
                            gate_verdict=gate_verdict,
                            model=model,
                            pipeline="still",
                        )

                    continue  # Retry only if a fix WAS applied

                # 6. All gates passed (or no gates) — keyframe_generated
                self._store.update_shot(shot_id, status="keyframe_generated")

                # 7. Append take
                rel_path = self._relative_path(keyframe_path)
                take_record = {
                    "take_number": take_num,
                    "file_path": rel_path,
                    "prompt_used": prompt,
                    "cost_usd": attempt_cost,
                    "timestamp": time.time(),
                    "model": model,
                    "pipeline": "still",
                    "gate_verdict": {
                        "passed": gate_verdict.passed,
                        "gate_name": gate_verdict.gate_name,
                    }
                    if gate_verdict
                    else None,
                    "disposition": None,
                    "inputs_snapshot": inputs_snapshot,
                }
                take_idx = self._store.append_take(shot_id, take_record)

                kf_take_id = _synthesize_take_id(take_record, shot_id, take_idx)

                kf_update = {
                    "output_path": rel_path,
                    "cost_incurred": total_cost,
                    "attempts": attempts,
                    "gate_results": {"keyframe_path": rel_path},
                }
                if any_deferred:
                    kf_update["deferred"] = True
                    kf_update["deferred_reason"] = "; ".join(deferred_reasons)
                self._store.update_shot(shot_id, **kf_update)

                if self._cost_logger:
                    self._cost_logger.record(shot_id, total_cost, model, "still")

                logger.info(
                    "Shot %s: keyframe generated → %s ($%.3f, %d attempts)",
                    shot_id,
                    rel_path,
                    total_cost,
                    attempts,
                )

                # R6 Phase 5 — universal sidecar via populate_sidecar SSOT.
                # Replaces write_pipeline_sidecar_RETIRED. Refs aggregation
                # preserved verbatim (scene + pose + identity + expression
                # refs, de-duped). Receipt=None — keyframe site has no
                # GenerationReceipt in scope; cost+seed via payload.
                # NOTE: video_path key holds keyframe_path because the
                # modern schema calls the artifact path `video_path` for
                # both image_t2i and video_i2v.
                try:
                    from recoil.pipeline._lib.sidecar import (
                        compute_sha256,
                        populate_sidecar,
                        write_sidecar_dict,
                    )

                    _kf_refs_for_sidecar = _aggregate_keyframe_refs(
                        scene_ref_path,
                        pose_ref_path,
                        identity_refs,
                        expression_refs,
                        reference_images,
                    )

                    _seed = None
                    _provider = None
                    if result is not None and getattr(result, "metadata", None):
                        _seed = (result.metadata or {}).get("seed")
                        _provider = (result.metadata or {}).get("provider")
                    _gate_results_dict = (
                        {
                            g.gate_name: ("pass" if g.passed else "fail")
                            for g in [gate_verdict]
                            if g
                        }
                        if gate_verdict
                        else {}
                    )
                    _inputs_snapshot_hash = (
                        inputs_snapshot.get("inputs_snapshot_hash")
                        if isinstance(inputs_snapshot, dict)
                        else None
                    )
                    _location_id = (
                        inputs_snapshot.get("location_id")
                        if isinstance(inputs_snapshot, dict)
                        else None
                    )

                    _sidecar_payload = {
                        "video_path": str(
                            keyframe_path
                        ),  # modern schema key — yes, "video_path" for keyframes
                        "model": model,
                        "modality": "image_t2i",
                        "prompt": prompt,
                        "cost_usd": total_cost,
                        "seed": _seed,
                    }

                    _sidecar_dict = populate_sidecar(
                        receipt=None,
                        payload=_sidecar_payload,
                        refs_used=_kf_refs_for_sidecar,
                        gate_results=_gate_results_dict,
                        pipeline="image_t2i",
                        dispatch_path=self._dispatch_path or "unknown",
                        shot_id=shot_id,
                        generation_params={
                            "aspect_ratio": aspect_ratio,
                            "attempts": attempts,
                        },
                        inputs_snapshot_hash=_inputs_snapshot_hash,
                        location_id=_location_id,
                        provider_adapter=_provider,
                    )
                    _sc_path = keyframe_path.with_suffix(keyframe_path.suffix + ".json")
                    if sidecar_extra:
                        _sidecar_dict.update(sidecar_extra)
                    _sidecar_dict["sha256"] = compute_sha256(keyframe_path)
                    write_sidecar_dict(_sc_path, _sidecar_dict)
                except Exception as e:
                    logger.warning("Sidecar write failed for %s: %s", shot_id, e)

                return StepResult(
                    shot_id=shot_id,
                    success=True,
                    final_state="keyframe_generated",
                    output_path=rel_path,
                    cost_usd=total_cost,
                    error=None,
                    take_index=take_idx,
                    gate_verdict=gate_verdict,
                    model=model,
                    pipeline="still",
                    take_id=kf_take_id,
                )

            # Exhausted retries without returning — shouldn't happen but safety net
            self._store.update_shot(
                shot_id,
                status="keyframe_mechanical_failed",
                error_message="Max gate retries exhausted",
            )
            return StepResult(
                shot_id=shot_id,
                success=False,
                final_state="keyframe_mechanical_failed",
                output_path=None,
                cost_usd=total_cost,
                error="Max gate retries exhausted",
                take_index=-1,
                gate_verdict=gate_verdict,
                model=model,
                pipeline="still",
            )

        except Exception as e:
            logger.error("Shot %s: execute_keyframe failed: %s", shot_id, e)
            try:
                self._store.update_shot(
                    shot_id,
                    status="keyframe_mechanical_failed",
                    error_message=str(e),
                    cost_incurred=total_cost,
                )
            except Exception as _store_err:
                fire_sanctioned_fallback(
                    "step_runner_outer_failure_store_update_skip",
                    scope="step_runner.execute_keyframe",
                    error=str(_store_err),
                )
            return StepResult(
                shot_id=shot_id,
                success=False,
                final_state="keyframe_mechanical_failed",
                output_path=None,
                cost_usd=total_cost,
                error=str(e),
                take_index=-1,
                gate_verdict=None,
                model=model,
                pipeline="still",
            )

    def transition(
        self,
        shot_id: str,
        to_state: str,
        reason: str = "",
    ) -> None:
        """Transition a shot to a new state. For external state changes.

        Used by Console for approve/reject/skip/unlock. The ExecutionStore
        validates the transition against VALID_TRANSITIONS and raises
        InvalidTransitionError if the transition is not allowed.

        Args:
            shot_id: The shot to transition.
            to_state: Target state (must be valid from current state).
            reason: Audit trail string (logged, not stored unless force_reset).
        """
        if reason:
            logger.info(
                "Shot %s: transition to %s (reason: %s)", shot_id, to_state, reason
            )
        self._store.update_shot(shot_id, status=to_state)

    def _save_keyframe(
        self,
        shot_id: str,
        result,
        take_number: int = 1,
        save_dir: Optional[Path | str] = None,
        filename_stem: Optional[str] = None,
    ) -> Path:
        """Save keyframe image with take numbering.

        Uses take_number to version output files, so gate retries don't
        overwrite prior takes.  Convention:
          {PROJECT}_EP{NNN}_S{NN}_take{N}.png
        """
        if (save_dir is None) != (filename_stem is None):
            raise ValueError("save_dir and filename_stem must be set together")

        if save_dir is not None and filename_stem is not None:
            output_dir = Path(save_dir)
            output_dir.mkdir(parents=True, exist_ok=True)
            output_path = output_dir / f"{filename_stem}.png"
        else:
            frames_dir = self._paths.frames_dir
            frames_dir.mkdir(parents=True, exist_ok=True)

            label = _make_shot_label_for_keyframe(shot_id, self._paths.project)
            if label is not None:
                filename = f"{label}_take{take_number}.png"
            else:
                filename = f"{shot_id}_take{take_number}.png"
            output_path = frames_dir / filename

        if result.image_data:
            output_path.write_bytes(result.image_data)
        else:
            raise ValueError(f"GenerationResult for {shot_id} has no image_data")

        logger.info("Saved keyframe: %s", output_path.name)
        return output_path

    def _write_sidecar(
        self,
        *,
        video_path: Path,
        run_result,  # RunResult or GenerationResult — both supported via getattr
        unified_payload: dict,
        gate_results: dict | None = None,
        prompt_layers: dict | None = None,
        # R6 Phase 6 — callers MUST pass pipeline / shot_id when known.
        # Defaults preserve back-compat; new code passes real values.
        pipeline: str | None = None,
        shot_id: str | None = None,
    ) -> None:
        """Write a .mp4.json sidecar next to the video using the canonical helper.

        R4 Phase 3 (A4): inline sidecar build replaced by populate_sidecar(...)
        from recoil.pipeline._lib.sidecar. Single source of truth for the
        sidecar contract — both single-shot dispatch (dispatch_cli) and
        r2v_multi (this writer) now produce identical schema.

        R6 Phase 6: accepts `pipeline` and `shot_id` from callers so the
        populate_sidecar provenance is correct. Falls back to "unknown" /
        None when caller does not supply them.
        """
        from recoil.pipeline._lib.sidecar import (
            populate_sidecar,
            write_sidecar_dict,
        )

        sidecar_path = video_path.with_suffix(video_path.suffix + ".json")
        payload = dict(unified_payload)
        payload["video_path"] = str(video_path)
        sidecar_dict = populate_sidecar(
            receipt=run_result,
            payload=payload,
            gate_results=gate_results,
            prompt_layers=prompt_layers,
            tag=unified_payload.get("tag"),
            project=unified_payload.get("project"),
            pipeline=pipeline or unified_payload.get("modality") or "unknown",
            shot_id=shot_id or unified_payload.get("shot_id"),
            dispatch_path=getattr(self, "_dispatch_path", None) or "unknown",
        )
        write_sidecar_dict(sidecar_path, sidecar_dict)

    def _video_grouping_filename_args(
        self,
        shot_id: str,
        grouping: dict | None,
    ) -> dict | None:
        """Return canonical grouped filename args, or None for tokenless solo."""
        if not isinstance(grouping, dict):
            return None

        strategy = grouping.get("strategy")
        if not strategy:
            return None
        if strategy == "solo":
            ordinal = grouping.get("ordinal")
            if ordinal not in (None, 0):
                raise ValueError(
                    "execute_video solo grouping must use ordinal 0 or None; "
                    f"got {grouping!r}"
                )
            return None

        ordinal = grouping.get("ordinal")
        shot_ids = grouping.get("shot_ids")
        if ordinal is None or not isinstance(shot_ids, list) or not shot_ids:
            raise ValueError(
                "execute_video requires grouping identity with strategy, ordinal, "
                f"and shot_ids for grouped filenames; got {grouping!r}"
            )

        ep_match = None
        for candidate in [*shot_ids, shot_id]:
            ep_match = re.search(r"EP(\d+)", str(candidate))
            if ep_match:
                break
        if not ep_match:
            raise ValueError(
                "execute_video grouped filename requires at least one EP<NNN> "
                f"token in grouping shot_ids or shot_id; got {grouping!r}"
            )

        return {
            "episode": int(ep_match.group(1)),
            "strategy": str(strategy),
            "ordinal": int(ordinal),
            "shot_ids": [str(sid) for sid in shot_ids],
        }

    def _save_video(
        self,
        shot_id: str,
        result,
        take_number: int | None = None,
        *,
        model: str,
        unified_payload: dict | None = None,
        grouping: dict | None = None,
        grouping_filename_args=_GROUPING_FILENAME_ARGS_UNSET,
    ) -> Path:
        """Save video data to versioned take file. Returns absolute path.

        Filenames use the canonical naming grammar (Phase 3 naming-reset):
        PASS_000 sentinel for legacy single-shot writes, tag SOLO_ENV.

        Args:
            shot_id: Shot identifier.
            result: GenerationResult with video_data or video_url.
            take_number: If provided, use this take number instead of scanning
                the filesystem. Prevents desync between filename and store record.
            model: model_id for the filename_id slot. REQUIRED — no default
                (Phase 3 Bug H fix; "unknown" sentinel eliminated).
            unified_payload: Optional payload dict (with prompt, reference_images,
                duration, aspect_ratio, etc.) used by Phase 6 (M+K) to write the
                .mp4.json sidecar with refs_used populated. When None, the
                sidecar write is skipped (legacy callers continue to function).
        """
        video_dir = self._paths.video_dir
        video_dir.mkdir(parents=True, exist_ok=True)

        # Parse shot_id to extract episode and shot token.
        m = re.match(r"(?:EP(\d+)_)?SH(\d+)([A-Za-z]*)", shot_id)
        if m:
            ep_num = int(m.group(1)) if m.group(1) else 1
            ep_str = f"EP{ep_num:03d}"
            shot_token = f"{int(m.group(2))}{m.group(3).lower()}"
            shot_id_canonical = f"{ep_str}_SH{shot_token.upper()}"
            _shot_id_conformant = True
        else:
            ep_num = 1
            shot_id_canonical = shot_id
            shot_token = shot_id.replace("EP", "").replace("_SH", "_")
            _shot_id_conformant = False

        # Solo tag: SOLO_ENV. The legacy single path does not encode character anchor.
        tag = "SOLO_ENV"  # noqa: F841  pre-existing; unrelated to REC-38

        # Compute take number.
        if grouping_filename_args is _GROUPING_FILENAME_ARGS_UNSET:
            grouping_filename_args = self._video_grouping_filename_args(
                shot_id,
                grouping,
            )
        if take_number is not None:
            take_num = take_number
        elif grouping_filename_args is not None:
            take_num = next_take_number(video_dir, **grouping_filename_args)
        else:
            existing_vids = sorted(video_dir.glob(f"*_SH{shot_token}_*take*.mp4"))
            max_take = 0
            for vp in existing_vids:
                tm = re.search(r"_take(\d+)\.mp4$", vp.name)
                if tm:
                    max_take = max(max_take, int(tm.group(1)))
            take_num = max_take + 1

        # R4 SHORT grammar: project/tag/model live in the sidecar, not the filename.
        # pass_counter omitted for single-shot dispatch (Phase 1 default).
        if grouping_filename_args is not None:
            video_filename = build_filename(
                **grouping_filename_args,
                take=take_num,
            )
        elif _shot_id_conformant:
            video_filename = build_filename(
                episode=ep_num,
                shot_ids=[shot_id_canonical],
                take=take_num,
            )
        else:
            # REC-130: a standalone/probe dispatch whose shot_id has no EP_SH
            # grammar (e.g. "SEEDANCE_I2V_<ts>", "PROBE_*") cannot go through the
            # strict episode namer (build_filename asserts the SH grammar and
            # raises with "no SH<num> token"). Save the output under a sanitized
            # fallback filename so a real (paid) standalone render is downloadable
            # instead of crashing post-generation after the spend.
            _safe_stem = re.sub(r"[^A-Za-z0-9_-]+", "_", shot_id).strip("_") or "probe"
            video_filename = f"{_safe_stem}_take{take_num}.mp4"
        video_path = video_dir / video_filename

        # Write video data
        if result.video_data:
            video_path.write_bytes(result.video_data)
        elif result.video_url:
            urllib.request.urlretrieve(result.video_url, str(video_path))
        else:
            raise ValueError(
                f"GenerationResult for {shot_id} has neither video_data nor video_url"
            )

        logger.info("Saved video: %s", video_path.name)

        # Phase 6 (M+K): write .mp4.json sidecar with refs_used populated.
        # Only when the caller provided a unified_payload — legacy callers
        # without payload context still function (they relied on
        # write_pipeline_sidecar_RETIRED downstream).
        if unified_payload is not None:
            try:
                self._write_sidecar(
                    video_path=video_path,
                    run_result=result,
                    unified_payload=unified_payload,
                    gate_results=(
                        result.metadata.get("gate_results")
                        if (result and getattr(result, "metadata", None))
                        else None
                    ),
                    prompt_layers=unified_payload.get("prompt_layers"),
                )
            except Exception as e:
                logger.warning(
                    "Phase 6 sidecar write failed for %s: %s", video_path.name, e
                )
        return video_path

    def execute_previz(
        self,
        shot: dict,
        all_shots: list[dict],
        bible: dict = None,
        episode: int = 1,
        project: str = None,
        model: str = "gemini-3.1-flash-image-preview",
        aspect_ratio: str = "9:16",
    ) -> StepResult:
        """Execute a previz generation step. Blocks until complete.

        Uses the full 8-slot multimodal context from previz_context.py:
        behavioral preamble, scoped bible, 3-shot window, location moodboards,
        prop refs, character refs (hero + turnaround), expression refs, and
        the generative shot directive.

        Args:
            shot: The plan shot dict (with prompt_data, asset_data, etc.)
            all_shots: All shots in the episode (for 3-shot continuity window)
            bible: Global bible dict (for character/location descriptions)
            episode: Episode number (for wardrobe phase resolution)
            project: Project name (for ref path resolution)
            model: Image generation model
            aspect_ratio: Output aspect ratio
        """
        shot_id = shot.get("shot_id", "unknown")
        cost = 0.0
        try:
            # 1. Transition — handle various starting states
            current = (self._store.get_shot(shot_id) or {}).get("status", "")
            if current != "previs_generating":
                try:
                    self._store.update_shot(shot_id, status="previs_generating")
                except Exception as _store_err:
                    fire_sanctioned_fallback(
                        "step_runner_batch_update_skip",
                        scope="step_runner.execute_previz.transition",
                        error=str(_store_err),
                    )
                    self._store.force_reset_status(
                        shot_id,
                        "previs_generating",
                        reason="previz regen from StepRunner",
                    )

            # 2. Build full multimodal context (8-slot layout with refs)
            from recoil.core.paths import ensure_pipeline_importable

            ensure_pipeline_importable()
            from recoil.pipeline._lib.previz_context import build_previz_context
            from google.genai import types as genai_types

            context_parts = build_previz_context(
                shot=shot,
                all_shots=all_shots,
                bible=bible,
                episode=episode,
                project=project or self._paths.project,
            )

            # Build previz inputs snapshot (ensure_pipeline_importable already called above)
            from recoil.pipeline._lib.take_inputs import build_previz_inputs_snapshot
            from recoil.core.paths import get_pipeline_config as _get_cfg_for_snap

            _previz_temp_snap = _get_cfg_for_snap().get("previz_temperature", 0.4)
            inputs_snapshot = build_previz_inputs_snapshot(
                context_parts=context_parts,
                shot=shot,
                bible=bible or {},
                generation_params={
                    "temperature": _previz_temp_snap,
                    "aspect_ratio": aspect_ratio,
                    "model": model,
                },
                builder_name="build_previz_inputs_snapshot",
            )

            # Assemble content parts for Gemini
            contents = []
            for data, kind, text_or_label in context_parts:
                if data is None:
                    contents.append(genai_types.Part.from_text(text=text_or_label))
                else:
                    contents.append(
                        genai_types.Part.from_bytes(data=data, mime_type=kind)
                    )
                    if text_or_label:
                        contents.append(genai_types.Part.from_text(text=text_or_label))

            from recoil.core.paths import get_pipeline_config as _get_cfg

            _previz_temp = _get_cfg().get("previz_temperature", 0.4)

            # 3. Generate — CP-2 Phase 6: route via GoogleAdapter direct_submit_image.
            # The legacy abstraction-piercing path through api_client is gone;
            # provider-aware dispatch goes through resolve_adapter + adapter.
            from recoil.execution.providers import resolve_adapter, UnifiedVideoPayload

            # Phase C: typed StepRunnerHints. genai_config rides via
            # extra="allow" (Google-specific; arbitrary_types_allowed=True
            # accepts the genai_types.ImageConfig instance inside).
            unified = UnifiedVideoPayload(
                prompt=shot.get("compiled_prompts", {}).get("previs_flash", "")
                or shot.get("prompt", ""),
                aspect_ratio=aspect_ratio,
                duration_s=0,
                resolution="720p",
                shot_id=shot_id,
                model_id=model,
                hints=StepRunnerHints(
                    modality="image",
                    parts=contents,
                    genai_config={
                        "temperature": _previz_temp,
                        "response_modalities": ["IMAGE", "TEXT"],
                        "image_config": genai_types.ImageConfig(
                            aspect_ratio=aspect_ratio
                        ),
                    },
                ),
            )
            adapter, _tier = resolve_adapter(model, unified)
            raw = adapter.direct_submit_image(unified)

            cost = model_profiles.get_cost(model)

            image_data = raw.get("image_bytes")

            if not image_data:
                raise ValueError("No image data in Flash response")

            # 4. Save to previs directory
            previs_dir = self._paths.previs_dir
            previs_dir.mkdir(parents=True, exist_ok=True)

            m = re.match(r"(?:EP\d+_)?SH(\d+)([A-Z]*)", shot_id)
            if m:
                shot_label = f"{int(m.group(1)):03d}{m.group(2).lower()}"
            else:
                shot_label = shot_id.replace("EP", "").replace("_SH", "_")

            take_number = self._next_take_number(shot_id)
            if take_number == 0:
                filename = f"shot_{shot_label}.png"
            else:
                filename = f"shot_{shot_label}_take{take_number}.png"

            output_path = previs_dir / filename
            output_path.write_bytes(image_data)

            # 5. Transition + append take
            self._store.update_shot(shot_id, status="previs_generated")
            rel_path = self._relative_path(output_path)
            prompt_used = shot.get("compiled_prompts", {}).get("previs_flash", "")[:200]
            take_record = {
                "take_number": take_number,
                "file_path": rel_path,
                "prompt_used": prompt_used,
                "cost_usd": cost,
                "timestamp": time.time(),
                "model": model,
                "pipeline": "previz",
                "disposition": None,
                "inputs_snapshot": inputs_snapshot,
            }
            self._store.append_take(shot_id, take_record)
            self._store.update_shot(
                shot_id,
                output_path=rel_path,
                cost_incurred=cost,
            )

            if self._cost_logger:
                self._cost_logger.record(shot_id, cost, model, "previz")

            logger.info(
                "Shot %s: previz complete → %s ($%.3f)", shot_id, filename, cost
            )

            return StepResult(
                shot_id=shot_id,
                success=True,
                final_state="previs_generated",
                output_path=rel_path,
                cost_usd=cost,
                error=None,
                take_index=take_number,
                gate_verdict=None,
                model=model,
                pipeline="previz",
            )

        except Exception as e:
            logger.error("Shot %s: execute_previz failed: %s", shot_id, e)
            try:
                self._store.update_shot(
                    shot_id,
                    status="previs_failed",
                    error_message=str(e),
                    cost_incurred=cost,
                )
            except Exception as _store_err:
                fire_sanctioned_fallback(
                    "step_runner_outer_failure_store_update_skip",
                    scope="step_runner.execute_previz",
                    error=str(_store_err),
                )
            return StepResult(
                shot_id=shot_id,
                success=False,
                final_state="previs_failed",
                output_path=None,
                cost_usd=cost,
                error=str(e),
                take_index=-1,
                gate_verdict=None,
                model=model,
                pipeline="previz",
            )

    def _next_take_number(self, shot_id: str) -> int:
        """Get the next take number atomically.

        Strategy: try ExecutionStore.acquire_next_take_number() first
        (if available), which holds an internal lock. Fall back to a
        max(store_count, fs_max) scan if not.

        The filesystem scan now uses the canonical shot-label functions
        so it actually finds existing files for both keyframes and videos.
        """
        # Preferred path: atomic acquisition via store lock
        if hasattr(self._store, "acquire_next_take_number"):
            return self._store.acquire_next_take_number(shot_id)

        # Fallback: scan store + filesystem
        store_count = 0
        shot = self._store.get_shot(shot_id)
        if shot and shot.get("takes"):
            store_count = len(shot["takes"])

        # Filesystem scan using canonical labels.
        # Keyframes still use shot_label form ({PROJECT}_{EP}_S{NN}).
        # Video filenames now follow the canonical video_naming grammar
        # ({PROJECT}_{EP}_PASS_{CCC}_SH{token}_{TAG}_{model}_take{T}.mp4)
        # — match by SH token substring rather than by a precomputed label.
        fs_max = 0
        kf_label = _make_shot_label_for_keyframe(shot_id, self._paths.project)

        # Derive a shot-token substring (e.g. "SH5", "SH33a") for the video scan.
        _shot_m = re.match(r"(?:EP\d+_)?SH(\d+)([A-Za-z]*)", shot_id)
        if _shot_m:
            shot_tok = f"SH{int(_shot_m.group(1))}{_shot_m.group(2).lower()}"
        else:
            shot_tok = None

        # Keyframe scan — uses kf_label
        if (
            kf_label is not None
            and self._paths.frames_dir
            and self._paths.frames_dir.exists()
        ):
            for f in self._paths.frames_dir.iterdir():
                if kf_label in f.name:
                    tm = re.search(r"_take(\d+)\.\w+$", f.name)
                    if tm:
                        fs_max = max(fs_max, int(tm.group(1)))

        # Video scan — substring match on shot token. Conservative: matches
        # any filename containing _SH{tok}_ or _SH{tok}.something_.
        if shot_tok is not None:
            for search_dir in [self._paths.previs_dir, self._paths.video_dir]:
                if search_dir and search_dir.exists():
                    for f in search_dir.iterdir():
                        # `_SH{tok}_` followed by tag is the canonical join;
                        # also accept `_SH{tok}.` for adjacent boundary tokens.
                        if f"_{shot_tok}_" in f.name or f"_{shot_tok}." in f.name:
                            tm = re.search(r"_take(\d+)\.\w+$", f.name)
                            if tm:
                                fs_max = max(fs_max, int(tm.group(1)))

        # Always "one past the highest known" — Opus Finding 11 (2026-04-09)
        # fixed the previous off-by-one in the fallback branch.
        return max(store_count + 1, fs_max + 1, 1)

    def _relative_path(self, absolute_path: Path) -> str:
        """Convert absolute path to project-relative string."""
        try:
            return str(absolute_path.relative_to(self._paths.project_root))
        except ValueError:
            return str(absolute_path)

    def execute_wan_r2v(
        self,
        shot_id: str,
        prompt: str,
        reference_image_paths: list[Path],
        model: str = "wan-2.7-r2v",
        duration: int = 5,
        aspect_ratio: str = "9:16",
        multi_shots: bool = True,
        on_status=None,
    ) -> StepResult:
        """Execute a Wan 2.7 Reference-to-Video generation.

        Fundamentally different from execute_multi_shot() — Wan R2V
        delegates camera decisions to the model. The output is a single
        scene clip where the model decides internal cuts (if multi_shots=True).

        Reference images are uploaded to fal storage. Output is registered
        as a single take on the primary shot_id.
        """

        cost = 0.0
        try:
            self._store.update_shot(shot_id, status="video_submitted")
            self._store.update_shot(shot_id, status="video_processing")

            client = get_client(model)

            # Pass local paths through — WanAdapter._coerce_image_to_url
            # handles fal upload via FalTransport. CP-2 removed _upload_to_fal
            # from VideoModelClient; legacy upload-here path silently dropped
            # refs.
            ref_urls = [str(p) for p in reference_image_paths]

            cost_per_second = model_profiles.get_cost(model)
            total_cost = cost_per_second * duration

            # CP-2 Phase 8/R4: build a UnifiedVideoPayload-compatible dict
            # (reference_image_urls → reference_images; multi_shots → hints).
            # asdict() of WanR2VPayload would yield reference_image_urls which
            # _dict_to_unified silently drops — which crashes Wan as T2V.
            # Phase C: typed StepRunnerHints at the write site. multi_shots
            # is typed as Optional[list[Any]] on StepRunnerHints — here it's
            # a bool/list flag forwarded to WanAdapter.direct_subscribe_r2v.
            payload_dict = {
                "prompt": prompt,
                "reference_images": ref_urls,
                "duration": duration,
                "resolution": "720p",
                "aspect_ratio": aspect_ratio,
                # multi_shots is typed Optional[list] on StepRunnerHints — bool
                # input fails Pydantic validation. Coerce: list passes through,
                # everything else (bool/None) becomes None. Truthy bool callers
                # already use the WanAdapter's body["multi_shots"]=True path.
                "hints": StepRunnerHints(
                    modality="video",
                    multi_shots=multi_shots if isinstance(multi_shots, list) else None,
                ),
            }
            job = client.submit(payload_dict)
            # CP-2 Phase 8/R4: VideoModelClient.submit() does NOT block —
            # job.result is None until wait_for_job runs. The Wan adapter's
            # build_submit returns a sentinel URL (fal-sdk://wan-r2v); the
            # VideoModelClient sentinel handler intercepts and calls
            # adapter.direct_subscribe_r2v which IS blocking. wait_for_job
            # short-circuits the poll loop because the result is already
            # in native_state.
            result = client.wait_for_job(job, timeout_s=720)
            if result is None:
                result = GenerationResult(
                    success=False, model=model, error="No result from Wan submit"
                )
            cost = result.cost or total_cost

            if not result.success:
                self._store.update_shot(
                    shot_id,
                    status="video_failed",
                    error_message=result.error or "Wan R2V generation failed",
                    cost_incurred=cost,
                )
                return StepResult(
                    shot_id=shot_id,
                    success=False,
                    final_state="video_failed",
                    output_path=None,
                    cost_usd=cost,
                    error=result.error,
                    take_index=-1,
                    gate_verdict=None,
                    model=model,
                    pipeline="wan_r2v",
                )

            # Save video
            take_num = self._next_take_number(shot_id)
            video_path = self._save_video(
                shot_id, result, take_number=take_num, model=model
            )

            self._store.update_shot(shot_id, status="video_ready")

            # Append take (append_take takes a single dict, not kwargs)
            rel_path = self._relative_path(video_path) if video_path else None
            take_record = {
                "take_number": take_num,
                "file_path": rel_path,
                "prompt_used": prompt[:200],
                "cost_usd": cost,
                "timestamp": time.time(),
                "model": model,
                "pipeline": "wan_r2v",
                "multi_shots": multi_shots,
                "ref_count": len(ref_urls),
                "disposition": None,
            }
            self._store.append_take(shot_id, take_record)

            if self._cost_logger:
                self._cost_logger.record(shot_id, cost, model, "video")

            self._store.update_shot(shot_id, status="video_complete")

            return StepResult(
                shot_id=shot_id,
                success=True,
                final_state="video_complete",
                output_path=str(video_path),
                cost_usd=cost,
                error=None,
                take_index=take_num,
                gate_verdict=None,
                model=model,
                pipeline="wan_r2v",
            )

        except Exception as e:
            logger.error("execute_wan_r2v failed for %s: %s", shot_id, e)
            self._store.update_shot(
                shot_id,
                status="video_failed",
                error_message=str(e),
                cost_incurred=cost,
            )
            return StepResult(
                shot_id=shot_id,
                success=False,
                final_state="video_failed",
                output_path=None,
                cost_usd=cost,
                error=str(e),
                take_index=-1,
                gate_verdict=None,
                model=model,
                pipeline="wan_r2v",
            )

    # ------------------------------------------------------------------
    # Coverage-pass execution (Phase 4)
    # ------------------------------------------------------------------

    def execute_pass(
        self,
        pass_id: str,
        prompt: str,
        reference_image_paths: list[Path],
        segment_shot_ids: list[str],
        expected_segment_timestamps: list[tuple[float, float]],
        model: str = "seeddance-2.0",
        duration: int = 10,
        aspect_ratio: str = "9:16",
        resolution: str | None = None,
        gates: list[GateFunction] | None = None,
        on_status=None,
        tier: str | None = None,
        reference_video_paths: list[Path] | None = None,
        reference_audio_paths: list[Path] | None = None,
        generate_audio: bool = True,
        grouping: dict | None = None,
        pass_counter: int | None = None,
        tag: str | None = None,
        seed: int | None = None,
        forced_take_number: int | None = None,
        generation_config: dict | None = None,
        element_config: dict | None = None,
    ) -> PassResult:
        """Execute a coverage-pass generation via SeedDance R2V.

        Takes primitive types only — the caller (production_loop.py) is
        responsible for building the prompt and resolving reference paths.

        Supports both reference images (max 9, tagged @Image1..@Image9 in
        the prompt) and reference videos (max 3, tagged @Video1..@Video3).
        Videos guide motion / staging; images anchor identity.

        Lifecycle:
        1. Upload refs to fal storage
        2. Build SeedDance payload (plain dict with reference_images and,
           if provided, reference_videos keys)
        3. Submit async job via client.submit()
        4. Poll via client.wait_for_job() (900s timeout)
        5. On success: save video, extract boundary frames, run gates
        6. Build PassResult with per-segment SegmentResults
        7. Return PassResult
        """
        cost = 0.0

        if gates:
            grouping_for_probe = grouping
            if grouping_for_probe is None and pass_counter is not None:
                grouping_for_probe = {
                    "strategy": "coverage",
                    "ordinal": pass_counter,
                    "shot_ids": list(segment_shot_ids),
                    "source_pass_id": pass_id,
                }
            if isinstance(grouping_for_probe, dict):
                strategy_for_probe = grouping_for_probe.get("strategy")
                ordinal_for_probe = grouping_for_probe.get("ordinal")
            else:
                strategy_for_probe = getattr(grouping_for_probe, "strategy", None)
                ordinal_for_probe = getattr(grouping_for_probe, "ordinal", None)
            ep_match_for_probe = (
                re.search(r"EP(\d+)", segment_shot_ids[0]) if segment_shot_ids else None
            )
            try:
                ordinal_valid_for_probe = ordinal_for_probe is not None
                if ordinal_valid_for_probe:
                    int(ordinal_for_probe)
            except (TypeError, ValueError):
                ordinal_valid_for_probe = False
            if strategy_for_probe and ordinal_valid_for_probe and ep_match_for_probe:
                _ensure_ffmpeg_available()

        try:
            # CP-2 Phase 4 — provider-aware path via VideoModelClient.
            # Replaces legacy get_client + tier_override flow. Tier is ALWAYS
            # passed as a kwarg (spec-review edit #7); never via
            # RECOIL_PROVIDER_TIER_OVERRIDE — that env-var pattern races under
            # ProductionLoop's ThreadPoolExecutor. The legacy get_client
            # factory in api_client lives on until Phase 8 (the deletion gate).
            if grouping is None:
                if pass_counter is None:
                    raise ValueError(
                        "execute_pass requires grouping identity for r2v_multi "
                        "filename writing"
                    )
                grouping = {
                    "strategy": "coverage",
                    "ordinal": int(pass_counter),
                    "shot_ids": list(segment_shot_ids),
                    "source_pass_id": pass_id,
                }
            if isinstance(grouping, dict):
                strategy = grouping.get("strategy")
                ordinal = grouping.get("ordinal")
            else:
                strategy = getattr(grouping, "strategy", None)
                ordinal = getattr(grouping, "ordinal", None)
            if not strategy or ordinal is None:
                raise ValueError(
                    "execute_pass requires grouping identity with strategy and "
                    f"ordinal; got {grouping!r}"
                )
            int(ordinal)
            ep_match = (
                re.search(r"EP(\d+)", segment_shot_ids[0]) if segment_shot_ids else None
            )
            if not ep_match:
                raise ValueError(
                    f"segment_shot_ids[0]={segment_shot_ids[0] if segment_shot_ids else None!r} "
                    f"missing EP<NNN> token; cannot build canonical filename"
                )

            from recoil.execution.video_model_client import VideoModelClient

            client = VideoModelClient(model_id=model, tier=(tier or "standard_720p"))

            # 1. Pass local ref paths straight through — SeedDanceClient.submit()
            # handles the upload to fal storage internally (see api_client.py:1511–1528)
            # and the resulting CDN URLs are forwarded to Atlas Cloud too when
            # SEEDANCE_PROVIDER=atlas. Previous pre-upload via a hasattr check
            # silently dropped refs because SeedDanceClient has no _upload_to_fal.
            if on_status:
                try:
                    on_status("uploading_refs")
                except Exception as _cb_err:
                    fire_sanctioned_fallback(
                        "step_runner_post_step_finalize_skip",
                        scope="step_runner.execute_seedance_r2v.on_status_uploading",
                        error=str(_cb_err),
                    )

            ref_inputs = [str(p) for p in reference_image_paths]
            ref_video_inputs = [str(p) for p in (reference_video_paths or [])]
            ref_audio_inputs = [str(p) for p in (reference_audio_paths or [])]

            # 2. Build SeedDance payload — plain dict, NOT SeedDancePayload
            payload = {
                "prompt": prompt,
                "reference_images": ref_inputs,
                "duration": duration,
                "aspect_ratio": aspect_ratio,
                "generate_audio": generate_audio,
            }
            if ref_video_inputs:
                payload["reference_videos"] = ref_video_inputs
            if ref_audio_inputs:
                payload["reference_audios"] = ref_audio_inputs
            if resolution is not None:
                payload["resolution"] = resolution

            # CP-2 spec-review edit #8 — payload.hints schema. Stamp modality
            # at the StepRunner boundary so resolve_adapter() never sees an
            # unannotated payload. _dict_to_unified() preserves the hints
            # dict on the UnifiedVideoPayload.
            # Phase C: typed StepRunnerHints construction at this write site.
            existing_hints = coerce_to_dict(payload.get("hints"))
            normalized_hints = {
                k: v
                for k, v in existing_hints.items()
                if k in StepRunnerHints.model_fields
            }
            normalized_hints["modality"] = "video"
            if seed is not None:
                normalized_hints["seed"] = seed
            normalized_hints = self._stamp_context_hints(normalized_hints)
            payload["hints"] = StepRunnerHints(**normalized_hints)
            assert coerce_to_dict(payload.get("hints")).get("modality") == "video", (
                f"payload.hints missing modality "
                f"(got keys: {list(coerce_to_dict(payload.get('hints')).keys())})"
            )

            # Cost estimate (provider-aware). Fall back to legacy get_cost if the
            # provider/tier resolution is incomplete for this model — a cost
            # estimate must never crash a dispatch (Debug R1 / Gemini F1).
            try:
                cost_per_second = model_profiles.get_provider_cost_per_second(model)
            except (KeyError, ValueError):
                try:
                    cost_per_second = model_profiles.get_cost(model)
                except KeyError:
                    cost_per_second = 0.0
            estimated_cost = cost_per_second * duration

            # 3. Submit async job
            if on_status:
                try:
                    on_status("submitted")
                except Exception as _cb_err:
                    fire_sanctioned_fallback(
                        "step_runner_post_step_finalize_skip",
                        scope="step_runner.execute_seedance_r2v.on_status_submitted",
                        error=str(_cb_err),
                    )

            job = client.submit(payload)

            # If submit itself failed (e.g. billing error), bail early
            if job.result and not job.result.success:
                cost = job.result.cost or 0.0
                return PassResult(
                    pass_id=pass_id,
                    success=False,
                    video_path=None,
                    cost_usd=cost,
                    model=model,
                    error=job.result.error or "SeedDance submit failed",
                    expected_cuts=max(0, len(segment_shot_ids) - 1),
                )

            # 4. Poll until completion (900s timeout for multi-shot r2v)
            if on_status:
                try:
                    on_status("processing")
                except Exception as _cb_err:
                    fire_sanctioned_fallback(
                        "step_runner_post_step_finalize_skip",
                        scope="step_runner.execute_seedance_r2v.on_status_processing",
                        error=str(_cb_err),
                    )

            result = client.wait_for_job(job, timeout_s=900, on_status=on_status)

            if not result.success:
                # A failed run must NOT inherit the estimate: a pre-inference
                # rejection (e.g. BILLING_NOT_ENOUGH_CREDITS) was never billed,
                # and stamping estimated_cost here produced phantom spend that
                # inflated the BudgetGuard tally (REC-122). Only the provider's
                # own reported cost counts on failure.
                return PassResult(
                    pass_id=pass_id,
                    success=False,
                    video_path=None,
                    cost_usd=result.cost or 0.0,
                    model=model,
                    error=result.error or "SeedDance generation failed",
                    expected_cuts=max(0, len(segment_shot_ids) - 1),
                )

            cost = result.cost or estimated_cost

            # 5. Save video using canonical naming grammar.
            video_dir = self._paths.video_dir
            video_dir.mkdir(parents=True, exist_ok=True)

            if grouping is None:
                if pass_counter is None:
                    raise ValueError(
                        "execute_pass requires grouping identity for r2v_multi "
                        "filename writing"
                    )
                grouping = {
                    "strategy": "coverage",
                    "ordinal": int(pass_counter),
                    "shot_ids": list(segment_shot_ids),
                    "source_pass_id": pass_id,
                }
            if isinstance(grouping, dict):
                from types import SimpleNamespace

                identity = SimpleNamespace(
                    strategy=grouping.get("strategy"),
                    ordinal=grouping.get("ordinal"),
                )
            else:
                identity = grouping
            strategy = getattr(identity, "strategy", None)
            ordinal = getattr(identity, "ordinal", None)
            if not strategy or ordinal is None:
                raise ValueError(
                    "execute_pass requires grouping identity with strategy and "
                    f"ordinal; got {grouping!r}"
                )
            if not isinstance(grouping, dict):
                from types import SimpleNamespace

                identity = SimpleNamespace(strategy=strategy, ordinal=ordinal)
            identity.ordinal = int(ordinal)

            # Derive episode + canonical shot_ids from segment_shot_ids.
            ep_match = (
                re.search(r"EP(\d+)", segment_shot_ids[0]) if segment_shot_ids else None
            )
            if not ep_match:
                raise ValueError(
                    f"segment_shot_ids[0]={segment_shot_ids[0] if segment_shot_ids else None!r} "
                    f"missing EP<NNN> token; cannot build canonical filename"
                )
            ep_num_pp = int(ep_match.group(1))

            if forced_take_number is not None:
                take_num = int(forced_take_number)
            else:
                take_num = next_take_number(
                    video_dir,
                    episode=ep_num_pp,
                    strategy=identity.strategy,
                    ordinal=identity.ordinal,
                    shot_ids=list(segment_shot_ids),
                )

            # R4 SHORT grammar: project/tag/model live in sidecar.
            # No fallback: build_filename ValueErrors propagate to the caller.
            video_filename = build_filename(
                episode=ep_num_pp,
                strategy=identity.strategy,
                ordinal=identity.ordinal,
                shot_ids=list(segment_shot_ids),
                take=take_num,
            )
            video_path = video_dir / video_filename
            if result.video_data:
                video_path.write_bytes(result.video_data)
            elif result.video_url:
                urllib.request.urlretrieve(result.video_url, str(video_path))
            else:
                return PassResult(
                    pass_id=pass_id,
                    success=False,
                    video_path=None,
                    cost_usd=cost,
                    model=model,
                    error="No video data or URL in generation result",
                    expected_cuts=max(0, len(segment_shot_ids) - 1),
                )

            logger.info("Saved coverage pass video: %s", video_path.name)

            # Phase 6 (M+K Bug M): write .mp4.json sidecar with refs_used
            # populated for the r2v_multi path. Before Phase 6, execute_pass
            # produced NO sidecar at all — see audit_assertions §9.
            try:
                sidecar_payload = {
                    "shot_id": pass_id,
                    "prompt": prompt,
                    "reference_images": ref_inputs,
                    "duration_s": duration,
                    "model": model,
                    "modality": "r2v_multi",
                    "project": self._paths.project,
                    "grouping": dict(grouping)
                    if isinstance(grouping, dict)
                    else {
                        "strategy": identity.strategy,
                        "ordinal": identity.ordinal,
                        "shot_ids": list(segment_shot_ids),
                    },
                    "segment_shot_ids": list(segment_shot_ids),
                }
                if isinstance(generation_config, dict):
                    sidecar_payload["generation_config"] = dict(generation_config)
                if isinstance(element_config, dict):
                    sidecar_payload["element_config"] = dict(element_config)
                self._write_sidecar(
                    video_path=video_path,
                    run_result=result,
                    unified_payload=sidecar_payload,
                    gate_results=(
                        result.metadata.get("gate_results")
                        if (result and getattr(result, "metadata", None))
                        else None
                    ),
                    prompt_layers=None,
                    pipeline="r2v_multi",
                    shot_id=pass_id,
                )
            except Exception as e:
                logger.warning(
                    "Phase 6 r2v_multi sidecar write failed for %s: %s",
                    video_path.name,
                    e,
                )

            # 6. Extract boundary frames for gate evaluation
            boundary_frames = self._extract_boundary_frames(
                video_path,
                expected_segment_timestamps,
            )

            # 7. Run gates on extracted frames, build SegmentResults
            segment_results = []
            gate_fail_verdict = (
                None  # REC-63: last failing gate verdict across segments
            )
            for i, shot_id in enumerate(segment_shot_ids):
                ts_start, ts_end = expected_segment_timestamps[i]
                frame_path = boundary_frames[i] if i < len(boundary_frames) else None

                usable = True
                identity_score = None
                gate_error = None

                if gates:
                    if frame_path and frame_path.exists():
                        shot_dict = {
                            "shot_id": shot_id,
                            "pass_id": pass_id,
                            "segment_index": i,
                        }
                        for gate_fn in gates:
                            try:
                                verdict = gate_fn(frame_path, shot_dict)
                                if verdict.gate_name == "gate_2_video":
                                    total_score = verdict.details.get("total_score")
                                    if isinstance(total_score, (int, float)):
                                        identity_score = (
                                            1.0 - min(float(total_score), 5.0) / 5.0
                                        )
                                if not verdict.passed:
                                    usable = False
                                    gate_fail_verdict = verdict
                                    logger.info(
                                        "Gate %s failed for segment %d (%s): %s",
                                        verdict.gate_name,
                                        i,
                                        shot_id,
                                        verdict.reason,
                                    )
                            except Exception as gate_err:
                                usable = False
                                gate_error = str(gate_err)
                                logger.warning(
                                    "Gate error on segment %d (%s): %s",
                                    i,
                                    shot_id,
                                    gate_err,
                                )
                    else:
                        usable = False
                        gate_error = (
                            f"frame_extract_failed:infra: segment {i} "
                            "boundary frame missing"
                        )
                        gate_fail_verdict = GateVerdict(
                            passed=False,
                            gate_name="gate_2_video",
                            reason=gate_error,
                            details={"gate_closed_by": "infra"},
                            retriable=True,
                        )
                        logger.warning(
                            "Gate infra failure on segment %d (%s): %s",
                            i,
                            shot_id,
                            gate_error,
                        )

                segment_results.append(
                    SegmentResult(
                        source_shot_id=shot_id,
                        segment_index=i,
                        timestamp_start_s=ts_start,
                        timestamp_end_s=ts_end,
                        boundary_frame_path=str(frame_path) if frame_path else None,
                        identity_score=identity_score,
                        usable=usable,
                        gate_error=gate_error,
                    )
                )

            if self._cost_logger:
                self._cost_logger.record(pass_id, cost, model, "coverage_pass")

            # ------------------------------------------------------------------
            # Phase 9 — Flora canvas attach (NON-FATAL post-generation hook)
            # Phase 9 decision (orchestrator default, JT may override):
            #   review-worthy only — gate-failed takes NOT attached.
            #   Hook runs here, after gate loop, only for takes that reach
            #   the success return path.
            # Phase 9 decision (orchestrator default, JT may override):
            #   every successful take attaches a node; replace-node dedup
            #   deferred (needs node-ID registry).
            # ------------------------------------------------------------------
            try:
                _provider = (
                    result.metadata.get("provider")
                    if (result and getattr(result, "metadata", None))
                    else None
                )
                _take_usable = bool(segment_results) and all(
                    getattr(s, "usable", True) for s in segment_results
                )
                if _provider == "flora" and _take_usable:
                    _flora_api_key = os.environ.get("FLORA_API_KEY", "")
                    _flora_ws = os.environ.get("RECOIL_FLORA_WORKSPACE", "")
                    if not _flora_ws:
                        logger.debug(
                            "canvas_attach skipped for %s — "
                            "RECOIL_FLORA_WORKSPACE not set",
                            pass_id,
                        )
                    else:
                        from recoil.execution.providers.flora_projects import (
                            resolve_flora_project,
                        )
                        from recoil.pipeline.tools.flora_canvas_sync import (
                            attach_video_to_canvas,
                        )

                        _flora_prj = resolve_flora_project(
                            self._paths.project,
                            self._episode,
                            api_key=_flora_api_key,
                            workspace_id=_flora_ws,
                            create=True,
                        )
                        _video_bytes = video_path.read_bytes()
                        _file_name = video_path.name
                        _attach_result = attach_video_to_canvas(
                            video_bytes=_video_bytes,
                            file_name=_file_name,
                            shot_id=pass_id,
                            workspace_id=_flora_ws,
                            project_id=_flora_prj,
                        )
                        if _attach_result.get("success"):
                            logger.info(
                                "canvas_attach ok pass=%s asset_id=%s",
                                pass_id,
                                _attach_result.get("asset_id"),
                            )
                        else:
                            logger.warning(
                                "canvas_attach failed (non-fatal) pass=%s error=%s",
                                pass_id,
                                _attach_result.get("error"),
                            )
                elif _provider == "flora":
                    logger.debug(
                        "canvas_attach skipped for %s — gate-failed take (not review-worthy)",
                        pass_id,
                    )
                else:
                    logger.debug(
                        "canvas_attach skipped for %s — provider=%s (not flora)",
                        pass_id,
                        _provider,
                    )
            except Exception as _canvas_err:
                logger.warning(
                    "canvas_attach exception (non-fatal) pass=%s: %s",
                    pass_id,
                    _canvas_err,
                )

            # REC-63: a failed identity gate on any segment fails the take so the
            # EpisodeRunner strategy branch fires a retry. execute_video already
            # does this at its own gate-fail path (884-924); execute_pass did not.
            # error carries the verdict reason for failure classification.
            if gate_fail_verdict is not None:
                return PassResult(
                    pass_id=pass_id,
                    success=False,
                    video_path=str(video_path),
                    cost_usd=cost,
                    segment_results=tuple(segment_results),
                    model=model,
                    take_index=take_num,
                    expected_cuts=max(0, len(segment_shot_ids) - 1),
                    api_metadata=dict(result.metadata) if result.metadata else {},
                    error=gate_fail_verdict.reason,
                )

            return PassResult(
                pass_id=pass_id,
                success=True,
                video_path=str(video_path),
                cost_usd=cost,
                segment_results=tuple(segment_results),
                model=model,
                take_index=take_num,
                expected_cuts=max(0, len(segment_shot_ids) - 1),
                api_metadata=dict(result.metadata) if result.metadata else {},
            )

        except Exception as e:
            if isinstance(e, RuntimeError) and str(e) == _FFMPEG_UNAVAILABLE_MESSAGE:
                raise
            logger.error("execute_pass failed for %s: %s", pass_id, e)
            return PassResult(
                pass_id=pass_id,
                success=False,
                video_path=None,
                cost_usd=cost,
                model=model,
                error=str(e),
                expected_cuts=len(segment_shot_ids) - 1 if segment_shot_ids else 0,
            )

    def _extract_boundary_frames(
        self,
        video_path: Path,
        segment_timestamps: list[tuple[float, float]],
    ) -> list[Path | None]:
        """Extract one frame per segment at 0.5s into each segment.

        Uses ffmpeg -ss {ts} -i {video} -frames:v 1 per segment.
        Returns a list of frame paths (one per segment). If extraction
        fails for a segment, that entry is None.
        """
        import subprocess

        frames_dir = video_path.parent / "boundary_frames"
        frames_dir.mkdir(parents=True, exist_ok=True)

        extracted: list[Path | None] = []
        stem = video_path.stem  # e.g. "pass_ws_ep001_take1"

        for i, (ts_start, ts_end) in enumerate(segment_timestamps):
            # Extract at 0.5s into the segment, clamped to segment bounds
            extract_ts = ts_start + 0.5
            if extract_ts > ts_end:
                extract_ts = (ts_start + ts_end) / 2.0

            frame_path = frames_dir / f"{stem}_seg{i:02d}.jpg"

            try:
                subprocess.run(
                    [
                        "ffmpeg",
                        "-y",
                        "-ss",
                        f"{extract_ts:.3f}",
                        "-i",
                        str(video_path),
                        "-frames:v",
                        "1",
                        "-q:v",
                        "2",
                        str(frame_path),
                    ],
                    capture_output=True,
                    timeout=30,
                    check=True,
                )
                if frame_path.exists() and frame_path.stat().st_size > 0:
                    extracted.append(frame_path)
                    logger.debug("Extracted boundary frame: %s", frame_path.name)
                else:
                    extracted.append(None)
                    logger.warning(
                        "Boundary frame extraction produced empty file for segment %d",
                        i,
                    )
            except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
                logger.warning(
                    "ffmpeg boundary frame extraction failed for segment %d: %s",
                    i,
                    e,
                )
                extracted.append(None)

        return extracted


__all__ = [
    # Public symbols (Phase D — MF-3 + DEBT-9).
    # Curated aggressively per spec § Phase 2 § Step 2.6 — this module is
    # 2,711 LoC. Only true module-level names with public-API intent or
    # Phase 1-scan caller imports go here. Class methods (StepRunner.execute_*)
    # are reachable via the class, not via module-level __all__.
    # Core runtime classes.
    "SessionCostLogger",
    "StepRunner",
    # Gate factories.
    "make_identity_gate",
    "make_video_drift_gate",
    # Shot-label helpers (underscore-prefixed, but Phase 1 caller-graph
    # captures them — keep exported until callers migrate to a public alias).
    "_make_shot_label_canonical",
    "_make_shot_label_for_keyframe",
]
