#!/usr/bin/env python3
"""
test_via_steprunner.py — Test generation through the unified pipeline.

ALL generation tests MUST go through StepRunner so results appear in
ExecutionStore and Dailies. Never call KlingClient/FalAiKlingClient directly.

Usage:
    # Single-shot I2V (standard)
    python3 tools/test_via_steprunner.py --project starsend-test --shot EP001_SH02

    # Multi-prompt coverage (WS/MS/CU from hero frame)
    python3 tools/test_via_steprunner.py --project starsend-test --shot EP001_SH02 --mode coverage

    # Multi-prompt action (10s continuous take)
    python3 tools/test_via_steprunner.py --project starsend-test --shot EP001_SH02 --mode action --prompt "Kit walks forward, crouches, picks up device, stands back up"

    # Multi-shot sequence (multiple shots batched)
    python3 tools/test_via_steprunner.py --project starsend-test --shots EP001_SH02,EP001_SH02A,EP001_SH03

    # Specify model
    python3 tools/test_via_steprunner.py --project starsend-test --shot EP001_SH02 --model kling-o3

    # Elements test (character identity via fal.ai O3 Omni)
    python3 tools/test_via_steprunner.py --project starsend-test --shot EP001_SH02 --model kling-o3 --elements KIT
    python3 tools/test_via_steprunner.py --project starsend-test --shot EP001_SH02 --model kling-o3 --elements KIT,NAVI

    # Elements with auto location ref (pulls location from plan data)
    python3 tools/test_via_steprunner.py --project tartarus --shots EP001_SH02,EP001_SH03 --elements KIT --model kling-o3

    # Wan 2.7 I2V (auto-selects wan_i2v prompt from prompt_engine)
    python3 tools/test_via_steprunner.py --project afterimage --shot AFTERIMAGE_TEST_001 --model wan-2.7-i2v --start-frame path/to/start.jpg

    # Wan 2.7 In Between (start + end frame)
    python3 tools/test_via_steprunner.py --project afterimage --shot AFTERIMAGE_TEST_001 --model wan-2.7-i2v --start-frame path/to/start.jpg --end-frame path/to/end.jpg

    # Kling V3 end-frame test (verifies end_image_url fix)
    python3 tools/test_via_steprunner.py --project afterimage --shot AFTERIMAGE_TEST_001 --model kling-v3 --start-frame path/to/start.jpg --end-frame path/to/end.jpg

    # Wan R2V with character refs
    python3 tools/test_via_steprunner.py --project afterimage --shot AFTERIMAGE_TEST_001 --model wan-2.7-r2v --wan-refs SADIE

    # Seedance R2V with reference video + image refs (client projects)
    python3 tools/test_via_steprunner.py --project driver-beware \\
      --model seeddance-2.0 --pass-id REGEN_P04 \\
      --seedance-refs BLUE_CAR,DRIVER,DEER,SUBURBAN_STREET \\
      --ref-video "/path/to/edit_clip.mov" \\
      --prompt "[0s-3s] ... [3s-6s] ..." \\
      --duration 12 --aspect-ratio 16:9 --no-audio

    # Seedance R2V dry-run (prints payload without submitting)
    python3 tools/test_via_steprunner.py --project driver-beware \\
      --model seeddance-2.0 --pass-id REGEN_P04 \\
      --seedance-refs BLUE_CAR,DRIVER,DEER,SUBURBAN_STREET \\
      --ref-video "/path/to/edit_clip.mov" \\
      --prompt "..." --duration 12 --dry-run

    # Test plan from visual/plans/ (Afterimage test shots)
    python3 tools/test_via_steprunner.py --project afterimage --shot AFTERIMAGE_TEST_001 --plan-dir visual

    # Kling V2V edit (O3/O1 video-to-video — source video + image refs → edited video)
    python3 tools/dispatch_cli.py --project _probes --model kling-o3 \\
      --ref-video /path/to/source.mov --image-refs LOOK_TARGET \\
      --prompt "Cell-animated style transfer ..." \\
      --v2v-endpoint o1_edit_standard

    # Seedance V2V edit (source video → bytedance/seedance-2.0/{fast/}reference-to-video)
    python3 tools/dispatch_cli.py --project driver-beware \\
      --model seeddance-2.0 \\
      --source-video projects/driver-beware/raw/shot_001.mp4 \\
      --prompt "[0s-5s] Replace the sausage in the pan with a burned grilled cheese sandwich." \\
      --image-refs DRIVER:three_quarter --tier fast --duration 5

    # Seedance V2V edit dry-run (prints payload + estimated cost without submitting)
    python3 tools/dispatch_cli.py --project driver-beware \\
      --model seeddance-2.0 \\
      --source-video /path/to/source.mp4 \\
      --prompt "Change kitchen interior to gothic style" \\
      --image-paths /path/to/style.png --dry-run
"""

import argparse
import json
import os
import sys
import time

# CP-1 (2026-06-01): centralized through core.paths.
# tools/ -> pipeline/ -> recoil/  ->  recoil_root for `core` import
_HERE = os.path.dirname(os.path.abspath(__file__))
_RECOIL_ROOT = os.path.dirname(os.path.dirname(_HERE))
if _RECOIL_ROOT not in sys.path:
    sys.path.insert(0, _RECOIL_ROOT)  # bootstrap so `core` resolves pre-install

from recoil.core.paths import ensure_pipeline_importable  # noqa: E402

ensure_pipeline_importable()  # injects recoil/pipeline/ onto sys.path

from pathlib import Path  # noqa: E402
from typing import Optional  # noqa: E402
from recoil.core.paths import projects_root  # noqa: E402
from recoil.pipeline.core.dispatch import dispatch  # noqa: E402
from recoil.pipeline.core.dispatch_context import DispatchContext  # noqa: E402
from recoil.pipeline.core.cost import read_cost_from_result  # noqa: E402
from recoil.pipeline._lib.dispatch_payload import (  # noqa: E402
    PayloadContext,
    build_unified_payload,
)


import logging as _pass_store_logging  # noqa: E402

_pass_store_log = _pass_store_logging.getLogger("test_via_steprunner.passstore")


def _derive_single_shot_tag(shot: dict) -> str:
    """Derive a semantic tag for a single-shot dispatch from shot.characters.

    Replaces the blanket SOLO_ENV default that A3 logged for every dispatch.
    Returns one of:
        SOLO_<CHAR>             1 character
        DUO_<CHAR1>_<CHAR2>     2 characters (alphabetized for stability)
        MULTI_CHAR              3+ characters
        SOLO_ENV                no characters (environment-only)
    """
    chars: list[str] = []
    asset = (shot or {}).get("asset_data", {}) if isinstance(shot, dict) else {}
    for entry in asset.get("characters") or []:
        if isinstance(entry, dict):
            cid = entry.get("char_id") or entry.get("name") or ""
        else:
            cid = str(entry)
        cid = cid.strip().upper().replace("-", "_")
        if cid:
            chars.append(cid)
    n = len(chars)
    if n == 0:
        return "SOLO_ENV"
    if n == 1:
        return f"SOLO_{chars[0]}"
    if n == 2:
        a, b = sorted(chars)
        return f"DUO_{a}_{b}"
    return "MULTI_CHAR"


def _derive_multi_shot_tag(shot_dicts: list[dict]) -> str:
    """Tag for multi-shot dispatch—shared anchor if all shots share a character.

    Rules:
      - All shots share exactly 1 character -> A_<CHAR>
      - All shots share exactly 2 characters -> A_<CHAR1>_<CHAR2> (alphabetized)
      - All shots share a location but no character -> COV_<LOC>
      - Otherwise -> COV_ENV
    """
    if not shot_dicts:
        return "COV_ENV"
    char_sets: list[set[str]] = []
    locs: set[str] = set()
    for sd in shot_dicts:
        chars: set[str] = set()
        asset = sd.get("asset_data", {}) if isinstance(sd, dict) else {}
        for entry in asset.get("characters") or []:
            if isinstance(entry, dict):
                cid = entry.get("char_id") or entry.get("name") or ""
            else:
                cid = str(entry)
            cid = cid.strip().upper().replace("-", "_")
            if cid:
                chars.add(cid)
        char_sets.append(chars)
        loc = asset.get("location_id") or sd.get("location_id")
        if loc:
            locs.add(str(loc).upper().replace("-", "_"))
    shared = set.intersection(*char_sets) if char_sets else set()
    if len(shared) == 1:
        return f"A_{next(iter(shared))}"
    if len(shared) == 2:
        a, b = sorted(shared)
        return f"A_{a}_{b}"
    if len(locs) == 1:
        return f"COV_{next(iter(locs))}"
    return "COV_ENV"


def _derive_episode_id(primary_id: str | None) -> str:
    """Parse EP{NNN} from any identifier; default to 'EP001' if unparseable."""
    import re as _re

    if primary_id:
        m = _re.match(r"[Ee][Pp](\d+)", primary_id)
        if m:
            return f"EP{int(m.group(1)):03d}"
    return "EP001"


def _derive_episode_int(primary_id: str | None) -> int:
    """Parse EP{NNN} → int from any identifier; default to 1 if unparseable.

    Used to populate DispatchContext.episode (Optional[int]) for receipts.
    """
    import re as _re

    if primary_id:
        m = _re.match(r"[Ee][Pp](\d+)", primary_id)
        if m:
            return int(m.group(1))
    return 1


def _derive_episode_token(args) -> str:
    """Return 'ep_NNN' for use in the --episode arg of audit_dispatch.

    Reads args.shot first, then args.shots (comma-separated; uses the first
    token). Falls back to ep_001 via _derive_episode_int's default. Used
    by the R5 SYNTHESIS §1.8 post-fire inspect-sidecars hook.
    """
    primary = getattr(args, "shot", None) or ""
    if not primary:
        shots_attr = getattr(args, "shots", None) or ""
        if isinstance(shots_attr, str):
            primary = shots_attr.split(",")[0].strip() if shots_attr else ""
        elif isinstance(shots_attr, (list, tuple)) and shots_attr:
            primary = str(shots_attr[0])
    ep_int = _derive_episode_int(primary or None)
    return f"ep_{ep_int:03d}"


def _post_fire_inspect_sidecar(args, sidecar_path) -> None:
    """R5 SYNTHESIS §1.8 — invoke audit inspection on a just-written
    sidecar. Non-blocking: failures emit WARNING but never fail the
    dispatch (the video is already paid for; we surface the regression
    for the next round). Catches 'synthetic passed, prod broke' the
    moment it lands. Wrapped in try/except per Gemini spec-review.
    """
    try:
        import os as _os
        import subprocess as _subprocess
        import sys as _sys
        from pathlib import Path as _P

        _repo = _P(__file__).resolve().parents[3]
        _env = {**_os.environ, "PYTHONPATH": str(_repo)}
        _subprocess.run(
            [
                _sys.executable,
                "recoil/pipeline/tools/audit_dispatch.py",
                "--project",
                args.project,
                "--episode",
                _derive_episode_token(args),
                "--inspect-sidecars",
                "--sidecar-path",
                str(sidecar_path),
            ],
            cwd=str(_repo),
            env=_env,
            timeout=60,
            # Intentionally NO check=True — warnings must not fail the
            # dispatch. Production sidecar leaks surface as WARN lines on
            # stderr; the dispatch is still complete.
        )
    except Exception as _post_err:  # noqa: BLE001
        print(f"WARNING: post-fire inspect-sidecars failed: {_post_err}")


def _normalise_passstore_id(original_id: str, episode_id: str) -> str:
    """Return a PassStore-compatible pass_id (must start with EP<digits>).

    If original_id already starts with EP<digits>, return unchanged.
    Otherwise prefix with episode_id and 'TEST' marker.
    """
    import re as _re

    if _re.match(r"[Ee][Pp]\d+", original_id or ""):
        return original_id
    safe = (
        _re.sub(r"[^A-Za-z0-9_]+", "_", original_id or "unknown").strip("_")
        or "unknown"
    )
    return f"{episode_id}_TEST_{safe}"


def _register_test_dispatch_with_passstore(
    *,
    project: str,
    original_pass_id: str,
    segment_shot_ids: list[str],
    model: str,
    prompt: str | None,
    output_path: str | None,
    cost_usd: float,
    latency_seconds: float,
    start_frame: str | None = None,
    refs: list[str] | None = None,
    duration: int | None = None,
    aspect_ratio: str | None = None,
) -> None:
    """Write a PassStore record for a successful test dispatch.

    Failures are logged as WARNING and swallowed — the dispatch exit code
    must be unaffected. The `origin: test_dispatch` field is load-bearing:
    Phase 8's reclaim tool and any future analysis filter on this field.
    """
    try:
        from datetime import datetime, timezone

        try:
            from recoil.execution.pass_store import PassStore
        except ImportError:
            import sys as _sys
            from pathlib import Path as _Path

            _recoil_root = _Path(__file__).resolve().parents[2]
            if str(_recoil_root) not in _sys.path:
                _sys.path.insert(0, str(_recoil_root))
            from recoil.execution.pass_store import PassStore  # type: ignore

        episode_id = _derive_episode_id(
            original_pass_id or (segment_shot_ids[0] if segment_shot_ids else None)
        )
        ps_pass_id = _normalise_passstore_id(original_pass_id, episode_id)

        store = PassStore(project)
        if store.get_pass(ps_pass_id) is None:
            store.create_pass(
                pass_id=ps_pass_id,
                segment_shot_ids=list(segment_shot_ids or [ps_pass_id]),
            )

        prompt_truncated = (prompt or "")[:500]
        take_record = {
            "origin": "test_dispatch",
            "origin_pass_id": original_pass_id,
            "model": model,
            "prompt": prompt_truncated,
            "cost_usd": float(cost_usd or 0.0),
            "latency_seconds": float(latency_seconds or 0.0),
            "output_path": output_path,
            "start_frame": start_frame,
            "refs": list(refs or []),
            "duration": duration,
            "aspect_ratio": aspect_ratio,
            "dispatched_at": datetime.now(timezone.utc)
            .isoformat()
            .replace("+00:00", "Z"),
            "dispatched_by": "test_via_steprunner.py",
        }
        store.append_pass_take(ps_pass_id, take_record)
        store.update_pass(
            ps_pass_id, cost_usd=float(cost_usd or 0.0), status="completed"
        )
    except Exception as e:  # noqa: BLE001
        _pass_store_log.warning(
            "PassStore registration failed for test dispatch (project=%s, pass_id=%s): %s",
            project,
            original_pass_id,
            e,
        )


def get_store(project):
    from recoil.execution.execution_store import ExecutionStore

    return ExecutionStore(project)


def get_plan_shot(project, shot_id, plan_dir="visual"):
    import re

    ep_match = re.match(r"EP(\d+)", shot_id)
    ep_num = int(ep_match.group(1)) if ep_match else 1

    # v3 layout: plans live under _pipeline/state/visual/plans/ — resolve via ProjectPaths
    from recoil.core.paths import ProjectPaths as _CorePaths

    _paths = _CorePaths.for_project(project)
    plan_path = _paths.plans_dir / f"ep_{ep_num:03d}_plan.json"

    # Fallback: try test_plan.json for test shots (AFTERIMAGE_TEST_*, etc.)
    if not plan_path.exists():
        test_plan_path = _paths.plans_dir / "test_plan.json"
        if test_plan_path.exists():
            plan_path = test_plan_path

    if not plan_path.exists():
        print(f"ERROR: Plan not found at {plan_path}")
        print("  Also checked: test_plan.json variants")
        sys.exit(1)

    plan = json.loads(plan_path.read_text())
    # Support both flat {"shots": [...]} and sequences {"sequences": [{"shots": [...]}]}
    if "sequences" in plan:
        all_shots = [s for seq in plan["sequences"] for s in seq.get("shots", [])]
    else:
        all_shots = plan.get("shots", [])
    shot = next((s for s in all_shots if s.get("shot_id") == shot_id), None)
    if not shot:
        print(f"ERROR: Shot {shot_id} not found in plan {plan_path.name}")
        ids_in_plan = [s["shot_id"] for s in all_shots if "shot_id" in s]
        if ids_in_plan:
            print(f"Available: {ids_in_plan}")
        elif "sequences" in plan:
            print("  This plan uses sequences format — shots have no shot_id.")
            print("  Sequences:", [seq.get("id") for seq in plan["sequences"]])
        sys.exit(1)
    return shot, ep_num, plan


def find_hero_frame(project, shot_id, store):
    shot_state = store.get_shot(shot_id)
    if shot_state:
        gate = shot_state.get("gate_results", {})
        for key in ("hero_frame", "first_frame"):
            frame = gate.get(key)
            if frame:
                for root in (projects_root() / project, Path(".")):
                    p = root / frame if not Path(frame).is_absolute() else Path(frame)
                    if p.exists():
                        return p
    # Fallback: search frames directory
    import re

    ep_match = re.match(r"EP(\d+)", shot_id)
    ep_num = int(ep_match.group(1)) if ep_match else 1
    frames_dir = projects_root() / project / "output" / "frames" / f"ep_{ep_num:03d}"
    if frames_dir.exists():
        # Find latest take
        candidates = sorted(
            frames_dir.glob(
                f"*{shot_id.replace('EP001_', 'shot_').replace('SH', '')}*"
            ),
            key=lambda p: p.stat().st_mtime,
            reverse=True,
        )
        if candidates:
            return candidates[0]
    return None


def build_elements(char_ids_str, project, plan_shots=None):
    """Build fal.ai Elements payload from comma-separated char IDs.

    When plan_shots is provided, automatically extracts the dominant location
    from the batch and includes it as an element (if refs exist and slots remain).

    Returns:
        (payload, char_ids, has_location_element, total_elements)
    """
    from recoil.pipeline._lib.elements import ElementManager, extract_batch_location

    char_ids = [c.strip().upper() for c in char_ids_str.split(",")]
    print(f"Elements: {char_ids}")

    # Extract location from plan shots
    location_id = None
    if plan_shots:
        location_id = extract_batch_location(plan_shots)
        if location_id:
            print(f"  Location: {location_id}")

    payload, has_location, total_elements = ElementManager.build_elements_with_info(
        char_ids,
        project,
        location_id=location_id,
    )
    if not payload:
        print("WARNING: No element refs found — proceeding without Elements")
        return None, char_ids, False, 0

    n_elements = len(payload.get("elements", []))
    for i, elem in enumerate(payload.get("elements", [])):
        frontal = "OK" if elem.get("frontal_image_url") else "MISSING"
        n_refs = len(elem.get("reference_image_urls", []))
        label = (
            "location"
            if (has_location and i == n_elements - 1)
            else f"char/{char_ids[i] if i < len(char_ids) else '?'}"
        )
        print(f"  Element {i + 1}: {label} frontal={frontal}, {n_refs} additional refs")

    if has_location:
        print(f"  Location element included as @Element{total_elements}")

    return payload, char_ids, has_location, total_elements


_IMG_EXTS = (".png", ".jpg", ".jpeg", ".webp")


def _resolve_client_frontal(project: str, asset_id: str) -> Path | None:
    """Resolve a client-project asset ID to its primary (frontal/establishing) ref.

    Searches characters → props → locations. Handles the naming variants that
    exist across driver-beware and other client projects:
      - characters/{id}/picks/frontal.{png,jpg,jpeg,webp}
      - props/{id}/picks/frontal.{png,jpg,jpeg,webp}
      - locations/{id}/establishing.{ext}
      - locations/{id}/hero.{ext}
      - locations/{id}/{id}_establishing.{ext}   (prefixed — driver-beware convention)
      - locations/{id}/{id}_hero.{ext}

    Returns first existing match, or None. Case-insensitive on asset ID.
    """
    aid = asset_id.strip().lower()
    proj_root = projects_root() / project
    candidates: list[Path] = []
    # v2 paths (refs/characters/, refs/props/, refs/locations/)
    for ext in _IMG_EXTS:
        candidates.append(
            proj_root / "refs" / "characters" / aid / "picks" / f"frontal{ext}"
        )
    for ext in _IMG_EXTS:
        candidates.append(
            proj_root / "refs" / "props" / aid / "picks" / f"frontal{ext}"
        )
    for stem in (f"{aid}_establishing", f"{aid}_hero", "establishing", "hero"):
        for ext in _IMG_EXTS:
            candidates.append(proj_root / "refs" / "locations" / aid / f"{stem}{ext}")
    # v3 paths (assets/identity/, assets/prop/, assets/loc/)
    # v3 naming: {id}_{kind}_{variant}_v{NN}.{ext} — glob for version suffix
    _MIN_IMAGE_BYTES = 1024
    for kind_dir in ("identity", "char"):
        asset_dir = proj_root / "assets" / kind_dir / aid
        if asset_dir.is_dir():
            for pattern in (f"{aid}_*hero*", f"{aid}_*front*", f"{aid}_*frontal*"):
                for ext in _IMG_EXTS:
                    hits = sorted(asset_dir.glob(f"{pattern}{ext}"))
                    if hits:
                        found = hits[-1]
                        if found.stat().st_size < _MIN_IMAGE_BYTES:
                            raise RuntimeError(
                                f"Ref image is a broken LFS pointer ({found.stat().st_size} bytes): {found}\n"
                                f"Restore the real image from Dropbox version history or regenerate."
                            )
                        return found
    prop_dir = proj_root / "assets" / "prop" / aid
    if prop_dir.is_dir():
        for pattern in (f"{aid}_*hero*", f"{aid}_*front*", "frontal*"):
            for ext in _IMG_EXTS:
                hits = sorted(prop_dir.glob(f"{pattern}{ext}"))
                if hits:
                    found = hits[-1]
                    if found.stat().st_size < _MIN_IMAGE_BYTES:
                        raise RuntimeError(
                            f"Ref image is a broken LFS pointer ({found.stat().st_size} bytes): {found}\n"
                            f"Restore the real image from Dropbox version history or regenerate."
                        )
                    return found
    loc_dir = proj_root / "assets" / "loc" / aid
    if loc_dir.is_dir():
        for pattern in (f"{aid}_*hero*", "*_loc_hero*", "*establishing*", "*hero*"):
            for ext in _IMG_EXTS:
                hits = sorted(loc_dir.glob(f"{pattern}{ext}"))
                if hits:
                    found = hits[-1]
                    if found.stat().st_size < _MIN_IMAGE_BYTES:
                        raise RuntimeError(
                            f"Ref image is a broken LFS pointer ({found.stat().st_size} bytes): {found}\n"
                            f"Restore the real image from Dropbox version history or regenerate."
                        )
                    return found
    for p in candidates:
        if p.exists():
            if p.stat().st_size < _MIN_IMAGE_BYTES:
                raise RuntimeError(
                    f"Ref image is a broken LFS pointer ({p.stat().st_size} bytes): {p}\n"
                    f"Restore the real image from Dropbox version history or regenerate."
                )
            return p
    return None


def _resolve_client_three_quarter(project: str, asset_id: str) -> Path | None:
    """Resolve the three-quarter/secondary ref for a client asset. None if absent.

    Characters/props: expects `picks/three_quarter.{ext}`.
    Locations: expects `{id}_grid`, `{id}_wide`, `grid`, `wide`, or any
    `establishing_*` variant (driver-beware has `establishing_clean.png`,
    `establishing_white_line.png`, etc).
    """
    aid = asset_id.strip().lower()
    proj_root = projects_root() / project
    for bucket in ("characters", "props"):
        for ext in _IMG_EXTS:
            p = proj_root / "refs" / bucket / aid / "picks" / f"three_quarter{ext}"
            if p.exists():
                return p
    loc_dir = proj_root / "refs" / "locations" / aid
    if loc_dir.exists():
        # Try structured names first, then fall back to any non-primary image.
        primary_stems = {f"{aid}_establishing", f"{aid}_hero", "establishing", "hero"}
        secondary_stems = [f"{aid}_grid", f"{aid}_wide", "grid", "wide"]
        for stem in secondary_stems:
            for ext in _IMG_EXTS:
                p = loc_dir / f"{stem}{ext}"
                if p.exists():
                    return p
        for p in sorted(loc_dir.iterdir()):
            if (
                p.is_file()
                and p.suffix.lower() in _IMG_EXTS
                and p.stem.lower() not in primary_stems
            ):
                return p
    return None


# Back-compat shim — existing callers expect a single resolver.
_resolve_client_ref = _resolve_client_frontal


# Phase 7 — Multi-view image-ref resolver. Extends the frontal-only
# CLI surface so V2V edits and Kling O3 Elements dispatches can target
# any of the canonical asset views without bypassing dispatch_cli.

_RESOLVER_BY_VIEW: dict[str, callable] = {
    "frontal": _resolve_client_frontal,
    "three_quarter": _resolve_client_three_quarter,
    # Profile + hero get inline resolvers — same shape as
    # _resolve_client_frontal but distinct stem sets.
}


def _resolve_client_profile(project: str, asset_id: str) -> "Path | None":
    """Resolve the profile ref for a client asset. None if absent.

    Characters/props: expects `picks/profile.{ext}`.
    Locations: no profile view (returns None).
    """
    aid = asset_id.strip().lower()
    proj_root = projects_root() / project
    for bucket in ("characters", "props"):
        for ext in _IMG_EXTS:
            p = proj_root / "refs" / bucket / aid / "picks" / f"profile{ext}"
            if p.exists():
                return p
    return None


def _resolve_client_hero(project: str, asset_id: str) -> "Path | None":
    """Resolve the hero ref for a client asset. None if absent.

    Characters/props: `picks/hero.{ext}`.
    Locations: `{aid}_hero.{ext}` or `hero.{ext}`.
    """
    aid = asset_id.strip().lower()
    proj_root = projects_root() / project
    for bucket in ("characters", "props"):
        for ext in _IMG_EXTS:
            p = proj_root / "refs" / bucket / aid / "picks" / f"hero{ext}"
            if p.exists():
                return p
    loc_dir = proj_root / "refs" / "locations" / aid
    if loc_dir.exists():
        for stem in (f"{aid}_hero", "hero"):
            for ext in _IMG_EXTS:
                p = loc_dir / f"{stem}{ext}"
                if p.exists():
                    return p
    return None


_RESOLVER_BY_VIEW["profile"] = _resolve_client_profile
_RESOLVER_BY_VIEW["hero"] = _resolve_client_hero

_VALID_VIEWS = frozenset(_RESOLVER_BY_VIEW.keys())
_SEEDANCE_VALID_AR = frozenset({"9:16", "16:9", "1:1", "4:3", "3:4", "21:9"})


def _resolve_client_view(
    project: str,
    asset_id: str,
    view: str = "frontal",
) -> "Path | None":
    """Single entry point for view-aware ref resolution.

    Args:
        project:   Project slug.
        asset_id:  Canonical asset id (case-insensitive).
        view:      One of _VALID_VIEWS; defaults to "frontal".

    Returns:
        Path to the resolved ref, or None if no file exists.

    Raises:
        ValueError: view not in _VALID_VIEWS.
    """
    view = (view or "frontal").strip().lower()
    if view not in _VALID_VIEWS:
        raise ValueError(f"unknown view {view!r}; valid: {sorted(_VALID_VIEWS)}")
    return _RESOLVER_BY_VIEW[view](project, asset_id)


def _parse_image_refs_arg(
    spec: str,
) -> list[tuple[str, str]]:
    """Parse `--image-refs` into [(asset_id_upper, view), ...] tuples.

    Supports the legacy form (`JADE,WREN`) and the new view-tagged form
    (`JADE:three_quarter,WREN:profile`). Missing view → "frontal".

    Raises:
        ValueError: malformed token or unknown view.
    """
    out: list[tuple[str, str]] = []
    if not spec:
        return out
    for raw in spec.split(","):
        token = raw.strip()
        if not token:
            continue
        if ":" in token:
            aid, view = token.split(":", 1)
            view = view.strip().lower() or "frontal"
        else:
            aid, view = token, "frontal"
        if view not in _VALID_VIEWS:
            raise ValueError(
                f"unknown view in --image-refs token {token!r}; "
                f"valid: {sorted(_VALID_VIEWS)}"
            )
        out.append((aid.strip().upper(), view))
    return out


def _parse_image_paths_arg(
    spec: str,
    project: str,
) -> list["Path"]:
    """Parse `--image-paths` (comma-separated absolute or project-relative
    paths) into a list of Paths. Validates every file exists.

    Args:
        spec:    Raw `--image-paths` value.
        project: Project slug (only used to resolve project-relative paths).

    Returns:
        List of resolved Paths in declaration order.

    Raises:
        FileNotFoundError: any path doesn't exist.
    """
    paths: list["Path"] = []
    if not spec:
        return paths
    proj_root = projects_root() / project
    for raw in spec.split(","):
        token = raw.strip()
        if not token:
            continue
        p = Path(token).expanduser()
        if not p.is_absolute():
            p = (proj_root / token).resolve()
        if not p.exists():
            raise FileNotFoundError(f"--image-paths token not found: {p}")
        paths.append(p)
    return paths


# ============================================================================
# CLI → PayloadContext adapter (payload_assembly convergence, 2026-05-25)
# ============================================================================
# Translates argparse.Namespace into a PayloadContext that build_unified_payload
# can consume. Refs are resolved at the CLI edge (including client-project
# fallback via _resolve_client_*) so build_unified_payload only ever sees
# pre-resolved Path lists for CLI calls. SYNTHESIS Condition 5: client-project
# fallback stays at the CLI edge, never imported into dispatch_payload.py.


def _resolve_wan_character_refs(project: str, char_ids_csv: str) -> "list[Path]":
    """Resolve wan-style refs (frontal + three-quarter per character).

    Checks both layouts:
      1. Canonical Tartarus pipeline output: assets/identity/{cid}/{cid}_front.{ext}
         and assets/identity/{cid}/{cid}_three_quarter.{ext}; falls back
         to {cid}_hero* if turnarounds are absent.
      2. Client-project layout: refs/characters/{cid}/picks/frontal.{ext} and
         picks/three_quarter.{ext} (via _resolve_client_frontal / _resolve_client_three_quarter).

    CLI-edge resolution per SYNTHESIS Condition 5 — never imported into
    dispatch_payload.py.
    """
    paths: list[Path] = []
    img_exts = (".jpg", ".png", ".jpeg", ".webp")
    from recoil.core.paths import ProjectPaths as _ProjectPaths

    _project_paths = _ProjectPaths.for_project(project)
    for cid in [c.strip().lower() for c in char_ids_csv.split(",") if c.strip()]:
        per_char: list[Path] = []
        # Layer 1 — canonical assets/identity layout
        char_dir = _project_paths.asset_kind_dir("identity", cid)
        if char_dir.exists():
            for stem in (f"{cid}_front", f"{cid}_three_quarter"):
                for ext in img_exts:
                    p = char_dir / f"{stem}{ext}"
                    if p.exists():
                        per_char.append(p)
            if not per_char:
                for stem in (f"{cid}_hero_v2", f"{cid}_hero"):
                    for ext in img_exts:
                        p = char_dir / f"{stem}{ext}"
                        if p.exists():
                            per_char.append(p)
                            break
                    if per_char:
                        break
        # Layer 2 — client-project layout fallback
        if not per_char:
            front = _resolve_client_frontal(project, cid)
            if front and Path(front).exists():
                per_char.append(Path(front))
            tq = _resolve_client_three_quarter(project, cid)
            if tq and Path(tq).exists():
                per_char.append(Path(tq))
        paths.extend(per_char)
    return paths


def _dispatch_kling_o3_elements(args, store) -> int:
    """Kling O3 Elements dispatch (self-contained) — start frame + elements +
    prompt, no --shot/plan required. For client-project ad-hoc shots.

    Triggered when --model is kling-o3, --elements is provided, --start-frame
    is provided, and --shot/--shots are absent.
    """
    from recoil.execution.step_runner import StepRunner
    from recoil.execution.step_types import ProjectPaths

    if not args.prompt:
        print("ERROR: --prompt is required for Kling O3 Elements standalone.")
        return 2

    # Start frame is OPTIONAL on Kling O3 R2V. Per capability_matrix.json + Gemini
    # research (2026-04-22), Kling's reference-to-video endpoint cannot honor both
    # elements AND start_image_url — they fight architecturally (latent-init vs
    # cross-attention). The validator will block if both are present. For the
    # elements-only dispatch, omit --start-frame entirely.
    start_frame: Optional[Path] = None
    if args.start_frame:
        start_frame = Path(args.start_frame)
        if not start_frame.exists():
            print(f"ERROR: --start-frame not found: {start_frame}")
            return 1
        print(
            f"  start frame: {start_frame.name} (WARN: will likely be ignored or fight elements on Kling O3 R2V)"
        )

    end_frame = None
    if args.end_frame:
        end_frame = Path(args.end_frame)
        if not end_frame.exists():
            print(f"ERROR: --end-frame not found: {end_frame}")
            return 1
        print(f"  end frame:   {end_frame.name}")

    # Build elements payload using client-project ref resolver (falls back to
    # canonical ElementManager if client-style refs aren't found).
    try:
        import fal_client as _fal
    except ImportError:
        print("ERROR: fal_client not installed — required for element uploads.")
        return 1

    element_char_ids = [c.strip().upper() for c in args.elements.split(",")]
    print(f"Elements: {element_char_ids}")

    elements_list = []
    for cid in element_char_ids:
        frontal = _resolve_client_frontal(args.project, cid)
        three_q = _resolve_client_three_quarter(args.project, cid)
        if frontal is None:
            print(
                f"ERROR: no frontal/establishing ref found for '{cid}' in client project"
            )
            print(
                f"  searched characters/props/locations for frontal|establishing|hero "
                f"in {_IMG_EXTS} (plain and {cid.lower()}_-prefixed)"
            )
            return 1
        frontal_url = _fal.upload_file(str(frontal))
        if three_q:
            ref_urls = [_fal.upload_file(str(three_q))]
            secondary_note = three_q.name
        else:
            # fal.ai rejects elements with empty reference_image_urls — fall back
            # to duplicating the frontal (same behavior as ElementManager).
            ref_urls = [frontal_url]
            secondary_note = "frontal (duplicated — no secondary ref found)"
        print(
            f"  @Element{len(elements_list) + 1} = {cid:<20s} ref={frontal.name}, secondary={secondary_note}"
        )
        elements_list.append(
            {"frontal_image_url": frontal_url, "reference_image_urls": ref_urls}
        )
    elements_payload = {"elements": elements_list}

    # --image-refs OR --image-paths — Kling O3 R2V image_urls channel
    # (@Image tokens). Both routes resolve to the same image_urls[] list.
    image_urls_list: list[str] = []
    image_ref_ids: list[str] = []
    if args.image_refs and args.image_paths:
        print("ERROR: --image-refs and --image-paths are mutually exclusive.")
        return 2
    if args.image_paths:
        try:
            local_paths = _parse_image_paths_arg(args.image_paths, args.project)
        except FileNotFoundError as e:
            print(f"ERROR: {e}")
            return 1
        for ref_path in local_paths:
            ref_url = _fal.upload_file(str(ref_path))
            tag = ref_path.stem.upper()
            print(
                f"  @Image{len(image_urls_list) + 1}   = {tag:<20s} "
                f"ref={ref_path.name}  (literal path)"
            )
            image_urls_list.append(ref_url)
            image_ref_ids.append(tag)
    elif args.image_refs:
        try:
            parsed = _parse_image_refs_arg(args.image_refs)
        except ValueError as e:
            print(f"ERROR: --image-refs: {e}")
            return 2
        print(f"Image refs: {[aid for aid, _ in parsed]}")
        for aid, view in parsed:
            ref_path = _resolve_client_view(args.project, aid, view)
            if ref_path is None:
                print(
                    f"ERROR: no {view!r} ref found for image-ref "
                    f"'{aid}' in project {args.project!r}"
                )
                return 1
            ref_url = _fal.upload_file(str(ref_path))
            print(
                f"  @Image{len(image_urls_list) + 1}   = {aid:<20s} "
                f"ref={ref_path.name}  (view={view})"
            )
            image_urls_list.append(ref_url)
            image_ref_ids.append(aid)
    image_urls_payload = image_urls_list if image_urls_list else None

    pass_id = args.pass_id or f"KLING_O3_{int(time.time())}"
    duration = args.duration

    print()
    print("Mode:       Kling O3 Elements (standalone)")
    print(f"Pass ID:    {pass_id}")
    print(f"Model:      {args.model}")
    print(f"Elements:   {element_char_ids}")
    print(f"Duration:   {duration}s")
    print(f"Aspect:     {args.aspect_ratio}")
    print(f"Audio:      {'OFF' if args.no_audio else 'ON'}")
    print(f"Prompt ({len(args.prompt.split())} words):")
    print("  " + args.prompt.replace("\n", "\n  "))

    # Pre-dispatch validation — catches the 6 classes of burn from 2026-04-22
    if not args.skip_validator:
        from recoil.pipeline._lib.dispatch_validator import validate_dispatch

        payload_intent = {
            "elements": elements_list,
            "image_urls": image_urls_list if image_urls_list else None,
            "start_image_url": str(start_frame) if start_frame else None,
            "prompt": args.prompt,
        }
        v_result = validate_dispatch(
            model="kling-o3",
            endpoint="reference-to-video",
            payload=payload_intent,
            project=args.project,
            projects_root=projects_root(),
            shot_id=args.shot_canonical,
            element_ids=[c.strip().lower() for c in args.elements.split(",")],
            image_ref_ids=[r.lower() for r in image_ref_ids],
            aspect_ratio=args.aspect_ratio,
            prompt=args.prompt or "",
            force=args.force,
        )
        print()
        print("=== Pre-dispatch validator ===")
        print(v_result.format_report())
        if not v_result.passed:
            print()
            print(
                "[BLOCKED] Validator errors must be fixed. --force bypasses warnings only."
            )
            return 2

    if args.dry_run:
        print()
        print("=== DRY RUN — not submitting ===")
        return 0

    # Episode 1 folder used as output container for client projects
    paths = ProjectPaths.for_episode(args.project, 1)
    paths.video_dir.mkdir(parents=True, exist_ok=True)
    runner = StepRunner(
        store=store,
        paths=paths,
        validate_frames=not args.no_validate_frames,
        episode=1,
    )

    t0 = time.time()
    ctx = DispatchContext(
        caller_id="dispatch_cli",
        step_runner=runner,
        project=args.project,
        episode=_derive_episode_int(pass_id),
    )
    # payload_assembly convergence (Phase 4, 2026-05-25): replace inline
    # payload dict with build_unified_payload. Kling O3 elements remains
    # modality=video_i2v (not r2v_multi) — the modality registry routes it
    # via the video_i2v runner; elements_payload + image_urls_payload are
    # passed through as Kling-O3-specific keys. fal.ai uploads happen above
    # (CLI-edge); only the dict construction moves through the assembler.
    pctx = PayloadContext(
        project=args.project,
        modality="video_i2v",
        shot_id=pass_id,
        prompt=args.prompt,
        start_frame_path=start_frame,
        end_frame_path=end_frame,
        model_id=args.model,
        duration_s=float(duration),
        aspect_ratio=args.aspect_ratio,
        generate_audio=not args.no_audio,
        negative_prompt=args.negative_prompt,
        elements_payload=elements_payload,
        image_urls_payload=image_urls_payload,
    )
    payload = build_unified_payload(pctx)
    receipt = dispatch("video_i2v", payload, context=ctx)
    result = receipt.run_result
    elapsed = time.time() - t0

    cost_usd = read_cost_from_result(result)

    if result.success:
        # Phase 9 — register test dispatch (kling_o3_elements path)
        _register_test_dispatch_with_passstore(
            project=args.project,
            original_pass_id=pass_id,
            segment_shot_ids=[pass_id],
            model=args.model,
            prompt=args.prompt,
            output_path=str(result.output_path) if result.output_path else None,
            cost_usd=cost_usd,
            latency_seconds=elapsed,
            start_frame=str(start_frame) if start_frame else None,
            refs=element_char_ids,
            duration=duration,
            aspect_ratio=args.aspect_ratio,
        )

    status = "OK" if result.success else "FAIL"
    print()
    print(f"[{status}] {pass_id} -> {result.output_path} (${cost_usd:.2f})")
    if result.error:
        print(f"Error: {result.error}")
    print(f"Done in {elapsed:.0f}s")
    return 0 if result.success else 1


def _dispatch_seedance_i2v(args, store) -> int:
    """Seedance I2V dispatch (self-contained) — first frame + optional last frame
    + prompt, no --shot/plan required. Useful for ad-hoc client shots.

    Triggered when --model is seeddance*, --start-frame is provided, and --shot/
    --shots are absent. Optional --seedance-refs adds reference images.

    Multi-shot mode: --segments accepts N angle/beat prompts. Durations are
    split evenly across --duration and combined into one [Xs-Ys] timestamped
    prompt, dispatching ONE Seedance job instead of N separate jobs.
    """
    from recoil.execution.step_runner import StepRunner
    from recoil.execution.step_types import ProjectPaths

    # Build prompt from --segments if provided (multi-shot mode)
    if args.segments:
        from recoil.pipeline._lib.prompt_engine import (
            build_seedance_i2v_multishot_prompt,
        )

        n = len(args.segments)
        seg_dur = max(1, args.duration // n)
        segments = [{"prompt": p, "duration": seg_dur} for p in args.segments]
        args.prompt = build_seedance_i2v_multishot_prompt(segments)
        print(f"Multi-shot: {n} segments × {seg_dur}s = {n * seg_dur}s combined prompt")

    if not args.prompt:
        print("ERROR: --prompt or --segments required for Seedance I2V mode.")
        return 2

    if not args.start_frame:
        print("ERROR: --start-frame required for Seedance I2V mode.")
        return 2
    start_frame = Path(args.start_frame)
    if not start_frame.exists():
        print(f"ERROR: --start-frame not found: {start_frame}")
        return 1
    print(f"  start frame: {start_frame.name}")

    end_frame = None
    if args.end_frame:
        end_frame = Path(args.end_frame)
        if not end_frame.exists():
            print(f"ERROR: --end-frame not found: {end_frame}")
            return 1
        print(f"  end frame:   {end_frame.name}")

    ref_image_paths: list[str] = []
    if args.seedance_refs:
        for raw in args.seedance_refs.split(","):
            aid = raw.strip()
            if not aid:
                continue
            p = _resolve_client_ref(args.project, aid)
            if p is None:
                print(
                    f"ERROR: cannot resolve ref for asset '{aid}' in project '{args.project}'."
                )
                return 1
            ref_image_paths.append(str(p))
            print(f"  @Ref{len(ref_image_paths)} = {aid:<20s} -> {p.name}")

    pass_id = args.pass_id or f"SEEDANCE_I2V_{int(time.time())}"
    duration = args.duration

    print()
    print("Mode:       Seedance I2V (first+last frame interpolation)")
    print(f"Pass ID:    {pass_id}")
    print(f"Model:      {args.model}")
    print(f"Duration:   {duration}s")
    print(f"Aspect:     {args.aspect_ratio}")
    print(f"Resolution: {args.resolution or 'default (720p)'}")
    print(f"Audio:      {'OFF' if args.no_audio else 'ON'}")
    print(f"Refs:       {len(ref_image_paths)} image refs")
    print(f"Prompt ({len(args.prompt.split())} words):")
    print("  " + args.prompt.replace("\n", "\n  "))

    if args.dry_run:
        print()
        print("=== DRY RUN — not submitting ===")
        return 0

    paths = ProjectPaths.for_episode(args.project, 1)
    paths.video_dir.mkdir(parents=True, exist_ok=True)
    runner = StepRunner(
        store=store,
        paths=paths,
        validate_frames=not args.no_validate_frames,
        episode=1,
    )

    t0 = time.time()
    ctx = DispatchContext(
        caller_id="dispatch_cli",
        step_runner=runner,
        project=args.project,
        episode=_derive_episode_int(pass_id),
    )
    # payload_assembly convergence (Phase 3, 2026-05-25): replace inline
    # payload dict construction with the shared assembler. Refs are already
    # resolved above into ref_image_paths (list[str]) — pass them via
    # PayloadContext.reference_image_paths so build_unified_payload doesn't
    # re-resolve.
    pctx = PayloadContext(
        project=args.project,
        modality="video_i2v",
        shot_id=pass_id,
        prompt=args.prompt,
        start_frame_path=start_frame,
        end_frame_path=end_frame,
        reference_image_paths=[Path(p) for p in ref_image_paths]
        if ref_image_paths
        else None,
        model_id=args.model,
        duration_s=float(duration),
        aspect_ratio=args.aspect_ratio,
        generate_audio=not args.no_audio,
        negative_prompt=getattr(args, "negative_prompt", None),
    )
    payload = build_unified_payload(pctx)
    receipt = dispatch("video_i2v", payload, context=ctx)
    result = receipt.run_result
    elapsed = time.time() - t0

    cost_usd = read_cost_from_result(result)

    if result.success:
        # Phase 9 — register test dispatch (seedance_i2v path)
        _refs_for_record = [
            a.strip() for a in (args.seedance_refs or "").split(",") if a.strip()
        ]
        _register_test_dispatch_with_passstore(
            project=args.project,
            original_pass_id=pass_id,
            segment_shot_ids=[pass_id],
            model=args.model,
            prompt=args.prompt,
            output_path=str(result.output_path) if result.output_path else None,
            cost_usd=cost_usd,
            latency_seconds=elapsed,
            start_frame=str(start_frame) if start_frame else None,
            refs=_refs_for_record,
            duration=duration,
            aspect_ratio=args.aspect_ratio,
        )

    status = "OK" if result.success else "FAIL"
    print()
    print(f"[{status}] {pass_id} -> {result.output_path} (${cost_usd:.2f})")
    if result.error:
        print(f"Error: {result.error}")
    print(f"Done in {elapsed:.0f}s")
    return 0 if result.success else 1


_VALID_V2V_ENDPOINTS = {
    "o3_edit_standard",
    "o3_edit_pro",
    "o1_edit_standard",
}


def _dispatch_kling_v2v_edit(args, store) -> int:
    """Kling V2V edit dispatch (self-contained) — source video + image refs +
    prompt → edited video. Routes through dispatch("video_i2v") with
    reference_videos and provider_hints={"endpoint": <v2v_endpoint>}.

    Triggered when --model is kling-o3, --ref-video is provided, and
    --v2v-endpoint is one of o3_edit_standard / o3_edit_pro / o1_edit_standard.
    Image refs come via --image-refs (resolved through _resolve_client_view,
    supports view-tagged form: ASSET:three_quarter) or --image-paths.
    """
    from recoil.execution.step_runner import StepRunner
    from recoil.execution.step_types import ProjectPaths

    if not args.prompt:
        print("ERROR: --prompt is required for Kling V2V edit.")
        return 2

    if args.v2v_endpoint not in _VALID_V2V_ENDPOINTS:
        print(
            f"ERROR: --v2v-endpoint must be one of {sorted(_VALID_V2V_ENDPOINTS)}, "
            f"got {args.v2v_endpoint!r}."
        )
        return 2

    # Resolve source video
    video_path = Path(args.ref_video).expanduser().resolve()
    if not video_path.exists():
        print(f"ERROR: --ref-video not found: {video_path}")
        return 1

    # Resolve image refs — supports view-tagged syntax (`JADE:three_quarter`)
    # OR `--image-paths` escape hatch. Mutually exclusive.
    image_paths: list[Path] = []
    image_ref_ids: list[str] = []
    if args.image_refs and args.image_paths:
        print("ERROR: --image-refs and --image-paths are mutually exclusive.")
        return 2
    if args.image_paths:
        try:
            image_paths = _parse_image_paths_arg(args.image_paths, args.project)
        except FileNotFoundError as e:
            print(f"ERROR: {e}")
            return 1
        image_ref_ids = [p.stem.upper() for p in image_paths]
    elif args.image_refs:
        try:
            parsed = _parse_image_refs_arg(args.image_refs)
        except ValueError as e:
            print(f"ERROR: --image-refs: {e}")
            return 2
        for aid, view in parsed:
            ref_path = _resolve_client_view(args.project, aid, view)
            if ref_path is None:
                print(
                    f"ERROR: no {view!r} ref found for image-ref "
                    f"'{aid}' in project {args.project!r}"
                )
                return 1
            image_paths.append(ref_path)
            image_ref_ids.append(aid)

    pass_id = args.pass_id or f"KLING_V2V_EDIT_{int(time.time())}"

    print()
    print("Mode:        Kling V2V edit (source video + image refs)")
    print(f"Project:     {args.project}")
    print(f"Pass ID:     {pass_id}")
    print(f"Model:       {args.model}")
    print(f"Endpoint:    {args.v2v_endpoint}")
    print(
        f"Source video: {video_path.name}  ({video_path.stat().st_size / 1_048_576:.1f} MB)"
    )
    print(f"Image refs:  {len(image_paths)}")
    for i, ip in enumerate(image_paths, 1):
        print(f"  @Image{i} = {ip.name}")
    print(f"Keep audio:  {args.keep_audio}")
    print(f"Prompt ({len(args.prompt.split())} words):")
    print("  " + args.prompt.replace("\n", "\n  "))

    if args.dry_run:
        print()
        print("=== DRY RUN — not submitting ===")
        return 0

    # Episode 1 folder used as output container for client/probe projects
    paths = ProjectPaths.for_episode(args.project, 1)
    paths.video_dir.mkdir(parents=True, exist_ok=True)
    runner = StepRunner(
        store=store,
        paths=paths,
        validate_frames=not args.no_validate_frames,
        episode=1,
    )

    provider_hints = {
        "endpoint": args.v2v_endpoint,
        "keep_audio": bool(args.keep_audio),
    }

    t0 = time.time()
    ctx = DispatchContext(
        caller_id="dispatch_cli",
        step_runner=runner,
        project=args.project,
        episode=_derive_episode_int(pass_id),
    )
    receipt = dispatch(
        "video_i2v",
        {
            "shot_id": pass_id,
            "prompt": args.prompt,
            "model": args.model,
            "reference_videos": [str(video_path)],
            "reference_images": [str(p) for p in image_paths] or None,
            "provider_hints": provider_hints,
            "generate_audio": False,
        },
        context=ctx,
    )
    result = receipt.run_result
    elapsed = time.time() - t0

    cost_usd = read_cost_from_result(result)

    if result.success:
        _register_test_dispatch_with_passstore(
            project=args.project,
            original_pass_id=pass_id,
            segment_shot_ids=[pass_id],
            model=args.model,
            prompt=args.prompt,
            output_path=str(result.output_path) if result.output_path else None,
            cost_usd=cost_usd,
            latency_seconds=elapsed,
            start_frame=None,
            refs=image_ref_ids,
            duration=None,
            aspect_ratio=None,
        )

    status = "OK" if result.success else "FAIL"
    print()
    print(f"[{status}] {pass_id} -> {result.output_path} (${cost_usd:.2f})")
    if result.error:
        print(f"Error: {result.error}")
    print(f"Done in {elapsed:.0f}s")
    return 0 if result.success else 1


def _dispatch_seedance_v2v_edit(args, store) -> int:
    """Seedance V2V edit dispatch — source video + (optional) image refs + prompt
    → edited video via the reference-to-video endpoint. No dedicated Seedance V2V
    endpoint exists; source video passes into `reference_videos` instead.

    Routes through dispatch("video_i2v", ...) with `reference_videos=[source, *extras]`
    and `provider_hints={"tier": "standard_720p" | "fast_720p"}`. The fal adapter's
    `_infer_action` sees `reference_videos` non-empty and routes to
    `bytedance/seedance-2.0/{fast/}reference-to-video`.

    Triggered when --model is seeddance* AND --source-video is provided
    AND no --shot / --shots are given (mode-detection branch).

    Image refs come via --image-refs (resolved through _resolve_client_view,
    supports view-tagged form: ASSET:three_quarter) or --image-paths.
    Additional reference videos come via --ref-video (comma-separated paths).
    Total reference_videos (source + extras) is capped at 3 per fal.ai schema.
    """
    from recoil.execution.step_runner import StepRunner
    from recoil.execution.step_types import ProjectPaths

    if not args.prompt:
        print("ERROR: --prompt is required for Seedance V2V edit.")
        return 2

    # Resolve source video
    if not args.source_video:
        print("ERROR: --source-video is required for Seedance V2V edit.")
        return 2
    source_path = Path(args.source_video).expanduser().resolve()
    if not source_path.exists():
        print(f"ERROR: --source-video not found: {source_path}")
        return 1

    # Tier resolution: argparse enforces choices=["standard","fast"] at CLI
    # entry; SimpleNamespace test paths reach this branch and need the guard.
    if args.tier not in {"standard", "fast"}:
        print(f"ERROR: --tier must be 'standard' or 'fast', got {args.tier!r}.")
        return 2
    tier = "standard_720p" if args.tier == "standard" else "fast_720p"

    # Duration clamp (Seedance profile supports 4-15s; default 5)
    duration = int(args.duration or 5)
    if duration < 4 or duration > 15:
        print(f"ERROR: --duration must be 4-15 for Seedance, got {duration}.")
        return 2

    # Aspect ratio whitelist
    if args.aspect_ratio not in _SEEDANCE_VALID_AR:
        print(
            f"ERROR: --aspect-ratio must be one of {sorted(_SEEDANCE_VALID_AR)}, "
            f"got {args.aspect_ratio!r}."
        )
        return 2

    # Image refs optional for Seedance V2V — the source video already carries
    # spatial-temporal structure; refs only constrain identity / wardrobe.
    image_paths: list[Path] = []
    image_ref_ids: list[str] = []
    if args.image_refs and args.image_paths:
        print("ERROR: --image-refs and --image-paths are mutually exclusive.")
        return 2
    if args.image_paths:
        try:
            image_paths = _parse_image_paths_arg(args.image_paths, args.project)
        except FileNotFoundError as e:
            print(f"ERROR: {e}")
            return 1
        image_ref_ids = [p.stem.upper() for p in image_paths]
    elif args.image_refs:
        try:
            parsed = _parse_image_refs_arg(args.image_refs)
        except ValueError as e:
            print(f"ERROR: --image-refs: {e}")
            return 2
        for aid, view in parsed:
            ref_path = _resolve_client_view(args.project, aid, view)
            if ref_path is None:
                print(
                    f"ERROR: no {view!r} ref found for image-ref "
                    f"'{aid}' in project {args.project!r}"
                )
                return 1
            image_paths.append(ref_path)
            image_ref_ids.append(aid)

    if len(image_paths) > 9:
        print(f"ERROR: Seedance V2V accepts max 9 image refs, got {len(image_paths)}.")
        return 1

    # Total cap is 3 per fal.ai schema (source + extras).
    extra_video_paths: list[Path] = []
    if args.ref_video:
        for raw in args.ref_video.split(","):
            vp_str = raw.strip()
            if not vp_str:
                continue
            vp = Path(vp_str).expanduser().resolve()
            if not vp.exists():
                print(f"ERROR: --ref-video path not found: {vp}")
                return 1
            extra_video_paths.append(vp)

    total_videos = 1 + len(extra_video_paths)
    if total_videos > 3:
        print(
            f"ERROR: Seedance V2V accepts max 3 reference videos "
            f"(source + extras), got {total_videos}."
        )
        return 1

    pass_id = args.pass_id or f"SEEDANCE_V2V_EDIT_{int(time.time())}"

    cost_per_second = 0.3034 if tier == "standard_720p" else 0.2419
    est_cost = cost_per_second * duration

    print()
    print("Mode:        Seedance V2V edit (source video → reference-to-video)")
    print(f"Project:     {args.project}")
    print(f"Pass ID:     {pass_id}")
    print("Model:       seeddance-2.0")
    print(f"Tier:        {args.tier} ({tier})")
    print(
        f"Endpoint:    bytedance/seedance-2.0/"
        f"{'fast/' if tier.startswith('fast') else ''}reference-to-video"
    )
    print(
        f"Source video: {source_path.name}  "
        f"({source_path.stat().st_size / 1_048_576:.1f} MB)"
    )
    if extra_video_paths:
        print(f"Extra videos: {len(extra_video_paths)}")
        for i, vp in enumerate(extra_video_paths, 2):
            print(f"  @Video{i} = {vp.name}  ({vp.stat().st_size / 1_048_576:.1f} MB)")
    print(f"Image refs:  {len(image_paths)}")
    for i, ip in enumerate(image_paths, 1):
        print(f"  @Image{i} = {ip.name}")
    print(f"Duration:    {duration}s")
    print(f"Aspect:      {args.aspect_ratio}")
    print(f"Audio:       {'OFF' if args.no_audio else 'ON'}")
    print(f"Est. cost:   ${est_cost:.2f} ({cost_per_second:.4f}/s × {duration}s)")
    print(f"Prompt ({len(args.prompt.split())} words):")
    print("  " + args.prompt.replace("\n", "\n  "))

    if args.dry_run:
        print()
        print("=== DRY RUN — not submitting ===")
        return 0

    # Episode 1 folder used as output container for client/probe projects
    paths = ProjectPaths.for_episode(args.project, 1)
    paths.video_dir.mkdir(parents=True, exist_ok=True)
    runner = StepRunner(
        store=store,
        paths=paths,
        validate_frames=not args.no_validate_frames,
        episode=1,
    )

    reference_videos_payload = [str(source_path)] + [str(p) for p in extra_video_paths]

    provider_hints = {"tier": tier}

    t0 = time.time()
    ctx = DispatchContext(
        caller_id="seedance_v2v",
        step_runner=runner,
        project=args.project,
        episode=_derive_episode_int(pass_id),
    )
    receipt = dispatch(
        "video_i2v",
        {
            "shot_id": pass_id,
            "prompt": args.prompt,
            "model": "seeddance-2.0",
            "duration": duration,
            "aspect_ratio": args.aspect_ratio,
            "reference_videos": reference_videos_payload,
            "reference_images": [str(p) for p in image_paths] or None,
            "provider_hints": provider_hints,
            "generate_audio": not args.no_audio,
        },
        context=ctx,
    )
    result = receipt.run_result
    elapsed = time.time() - t0

    cost_usd = read_cost_from_result(result)

    if result.success:
        _register_test_dispatch_with_passstore(
            project=args.project,
            original_pass_id=pass_id,
            segment_shot_ids=[pass_id],
            model="seeddance-2.0",
            prompt=args.prompt,
            output_path=str(result.output_path) if result.output_path else None,
            cost_usd=cost_usd,
            latency_seconds=elapsed,
            start_frame=None,
            refs=image_ref_ids,
            duration=duration,
            aspect_ratio=args.aspect_ratio,
        )

    status = "OK" if result.success else "FAIL"
    print()
    print(f"[{status}] {pass_id} -> {result.output_path} (${cost_usd:.2f})")
    if result.error:
        print(f"Error: {result.error}")
    print(f"Done in {elapsed:.0f}s")
    return 0 if result.success else 1


def _dispatch_seedance_r2v(args, store) -> int:
    """Seedance R2V dispatch — uploads image + video refs, submits via
    StepRunner.execute_pass(), prints result. Returns process exit code.

    Triggered when --model is seeddance* AND (--ref-video or --seedance-refs).
    Self-contained: does not require --shot/--shots/--plan-dir.
    """
    from recoil.execution.step_runner import StepRunner
    from recoil.execution.step_types import ProjectPaths

    if not args.prompt:
        print("ERROR: --prompt is required for Seedance R2V mode.")
        return 2

    # Resolve image refs by ID (preserving order — determines @Image1..N)
    ref_image_paths: list[Path] = []
    if args.seedance_refs:
        for raw in args.seedance_refs.split(","):
            aid = raw.strip()
            if not aid:
                continue
            p = _resolve_client_ref(args.project, aid)
            if p is None:
                print(
                    f"ERROR: cannot resolve ref for asset '{aid}' in project '{args.project}'."
                )
                print(f"  Tried refs/{{characters|props|locations}}/{aid.lower()}/...")
                return 1
            ref_image_paths.append(p)
            rel = p.relative_to(projects_root() / args.project)
            print(f"  @Image{len(ref_image_paths)} = {aid:<20s} -> {rel}")

    if len(ref_image_paths) > 9:
        print(
            f"ERROR: Seedance R2V accepts max 9 image refs, got {len(ref_image_paths)}."
        )
        return 1

    # Resolve video refs (local paths)
    ref_video_paths: list[Path] = []
    if args.ref_video:
        for raw in args.ref_video.split(","):
            vp_str = raw.strip()
            if not vp_str:
                continue
            vp = Path(vp_str)
            if not vp.exists():
                print(f"ERROR: --ref-video path not found: {vp}")
                return 1
            ref_video_paths.append(vp)
            print(
                f"  @Video{len(ref_video_paths)} = {vp.name}  ({vp.stat().st_size / 1_048_576:.1f} MB)"
            )

    if len(ref_video_paths) > 3:
        print(
            f"ERROR: Seedance R2V accepts max 3 video refs, got {len(ref_video_paths)}."
        )
        return 1

    # Resolve audio refs (local paths) — native audio channel for Seedance R2V
    ref_audio_paths: list[Path] = []
    if args.audio_url:
        for raw in args.audio_url.split(","):
            ap_str = raw.strip()
            if not ap_str:
                continue
            ap = Path(ap_str)
            if not ap.exists():
                print(f"ERROR: --audio-url path not found: {ap}")
                return 1
            ref_audio_paths.append(ap)
            print(
                f"  @Audio{len(ref_audio_paths)} = {ap.name}  ({ap.stat().st_size / 1024:.1f} KB)"
            )

    if len(ref_audio_paths) > 3:
        print(
            f"ERROR: Seedance R2V accepts max 3 audio refs, got {len(ref_audio_paths)}."
        )
        return 1

    if not ref_image_paths and not ref_video_paths:
        print("ERROR: R2V requires at least one reference (image or video).")
        return 2

    pass_id = args.pass_id or f"SEEDANCE_R2V_{int(time.time())}"
    duration = args.duration

    # Derive segment timestamps from the prompt's [Xs-Ys] markers, if present,
    # so boundary-frame extraction aligns with the sequence structure. Falls
    # back to a single segment covering the full duration.
    import re as _re

    seg_matches = _re.findall(
        r"\[(\d+(?:\.\d+)?)\s*s\s*[-\u2013]\s*(\d+(?:\.\d+)?)\s*s\]", args.prompt
    )
    if seg_matches:
        expected_segment_timestamps = [(float(a), float(b)) for a, b in seg_matches]
        segment_shot_ids = [
            f"{pass_id}_SEG{i + 1:02d}" for i in range(len(seg_matches))
        ]
    else:
        expected_segment_timestamps = [(0.0, float(duration))]
        segment_shot_ids = [pass_id]

    print()
    print("Mode:       Seedance R2V (video + image reference)")
    print(f"Pass ID:    {pass_id}")
    print(f"Model:      {args.model}")
    print(f"Duration:   {duration}s")
    print(f"Aspect:     {args.aspect_ratio}")
    print(f"Resolution: {args.resolution or 'default (720p)'}")
    print(f"Audio:      {'OFF' if args.no_audio else 'ON'}")
    print(
        f"Segments:   {len(segment_shot_ids)}  timestamps={expected_segment_timestamps}"
    )
    print(f"Image refs: {len(ref_image_paths)}")
    print(f"Video refs: {len(ref_video_paths)}")
    print(f"Audio refs: {len(ref_audio_paths)}")
    print(f"Prompt ({len(args.prompt.split())} words):")
    print("  " + args.prompt.replace("\n", "\n  "))

    if args.dry_run:
        print()
        print("=== DRY RUN — not submitting ===")
        return 0

    # Episode 1 folder is used as output container for client projects.
    paths = ProjectPaths.for_episode(args.project, 1)
    paths.video_dir.mkdir(parents=True, exist_ok=True)
    runner = StepRunner(
        store=store,
        paths=paths,
        validate_frames=not args.no_validate_frames,
        episode=1,
    )

    t0 = time.time()
    result = runner.execute_pass(
        pass_id=pass_id,
        prompt=args.prompt,
        reference_image_paths=ref_image_paths,
        segment_shot_ids=segment_shot_ids,
        expected_segment_timestamps=expected_segment_timestamps,
        model=args.model,
        duration=duration,
        aspect_ratio=args.aspect_ratio,
        resolution=args.resolution,
        reference_video_paths=ref_video_paths,
        reference_audio_paths=ref_audio_paths,
        generate_audio=not args.no_audio,
    )
    elapsed = time.time() - t0

    if result.success:
        # Phase 9 — register test dispatch (seedance_r2v path)
        _refs_for_record = [
            a.strip() for a in (args.seedance_refs or "").split(",") if a.strip()
        ]
        _register_test_dispatch_with_passstore(
            project=args.project,
            original_pass_id=result.pass_id or pass_id,
            segment_shot_ids=segment_shot_ids,
            model=args.model,
            prompt=args.prompt,
            output_path=str(result.video_path) if result.video_path else None,
            cost_usd=float(result.cost_usd or 0.0),
            latency_seconds=elapsed,
            start_frame=None,
            refs=_refs_for_record,
            duration=duration,
            aspect_ratio=args.aspect_ratio,
        )

    status = "OK" if result.success else "FAIL"
    print()
    print(
        f"[{status}] {result.pass_id} -> {result.video_path} (${result.cost_usd:.2f})"
    )
    if result.error:
        print(f"Error: {result.error}")
    if result.api_metadata:
        print(f"API metadata: {result.api_metadata}")
    print(f"Done in {elapsed:.0f}s")
    return 0 if result.success else 1


def _dry_run_dump(label: str, payload, *, prompt: str | None = None) -> int:
    """Print FULL payload + prompt for --dry-run. Return 0 (caller returns this).

    Cost-safety gate: every dispatch()/runner.execute_*() call in main() must
    be preceded by `if args.dry_run: return _dry_run_dump("label", payload)`.
    See memory/feedback-dispatch-cli-r2v-multi-dry-run-hole.md.
    """
    print(f"\n=== DRY RUN — {label} (NOT submitting) ===")
    if prompt is not None:
        print(f"--- prompt ({len(prompt.split())} words, {len(prompt)} chars) ---")
        print(prompt)
        print("--- end prompt ---")
    print("--- payload ---")

    def _serialize(o):
        if isinstance(o, Path):
            return str(o)
        # Try dataclasses first (deterministic, no signature surprises)
        try:
            from dataclasses import asdict, is_dataclass

            if is_dataclass(o) and not isinstance(o, type):
                return asdict(o)
        except Exception:
            pass
        # Then to_dict() with strict arity check
        to_dict = getattr(o, "to_dict", None)
        if callable(to_dict):
            import inspect

            try:
                sig = inspect.signature(to_dict)
                required = [
                    p
                    for p in sig.parameters.values()
                    if p.default is inspect.Parameter.empty
                    and p.kind
                    not in (
                        inspect.Parameter.VAR_POSITIONAL,
                        inspect.Parameter.VAR_KEYWORD,
                    )
                ]
                if not required:
                    return to_dict()
            except (TypeError, ValueError):
                pass
        return repr(o)

    # Redact prompt from payload if printed separately above — avoid double-print.
    if (
        prompt is not None
        and isinstance(payload, dict)
        and payload.get("prompt") == prompt
    ):
        payload = {**payload, "prompt": "<printed above — redacted from payload dump>"}
    try:
        print(json.dumps(payload, indent=2, default=_serialize))
    except Exception as e:
        print(f"(json serialize failed: {e}; falling back to repr)")
        print(repr(payload))
    print(f"=== end {label} ===\n")
    return 0


def main():
    parser = argparse.ArgumentParser(
        description="Test generation through unified StepRunner pipeline"
    )
    parser.add_argument(
        "--project", required=True, help="Project name (e.g., starsend-test)"
    )
    parser.add_argument(
        "--shot", help="Shot ID for single-shot or coverage (e.g., EP001_SH02)"
    )
    parser.add_argument(
        "--shots",
        help="Comma-separated shot IDs for sequence (e.g., EP001_SH02,EP001_SH02A)",
    )
    parser.add_argument(
        "--per-shot",
        action="store_true",
        help=(
            "When used with --shots, dispatch each shot individually (per-shot mode). "
            "Default is r2v_multi (single multi-shot dispatch). Use --per-shot for "
            "surgical retries inside a batch or continuity overrides—per "
            "pipeline-learnings §26, by-shot is the exception."
        ),
    )
    parser.add_argument(
        "--mode",
        default="standard",
        choices=["standard", "action", "coverage"],
        help="Generation mode",
    )
    parser.add_argument("--model", default="kling-v3", help="Model ID")
    parser.add_argument("--prompt", help="Custom prompt (overrides plan prompt)")
    parser.add_argument(
        "--duration", type=int, default=5, help="Duration in seconds (standard/action)"
    )
    parser.add_argument(
        "--elements",
        help="Comma-separated character IDs for Elements (e.g., KIT or KIT,NAVI). Forces kling-o3 model.",
    )
    parser.add_argument(
        "--veo-refs",
        help="Comma-separated character IDs for Veo reference images (e.g., TORCH). Max 3 images auto-selected.",
    )
    parser.add_argument(
        "--negative-prompt", help="Negative prompt string to pass to StepRunner"
    )
    parser.add_argument(
        "--generate-audio", action="store_true", help="Enable audio generation"
    )
    parser.add_argument(
        "--start-frame", help="Explicit start frame path (overrides hero frame lookup)"
    )
    parser.add_argument(
        "--end-frame", help="End frame path for In Between / sandwich testing"
    )
    parser.add_argument(
        "--wan-refs",
        help="Comma-separated char IDs for Wan R2V refs (e.g., SADIE or SADIE,DUSTY)",
    )
    parser.add_argument(
        "--plan-dir",
        default="visual",
        choices=["visual"],
        help="Plan directory (default: 'visual'). v2 'starsend' namespace was retired in the v3 layout migration; only 'visual' is supported.",
    )
    parser.add_argument(
        "--prompt-style",
        default="balanced",
        choices=["directed", "balanced", "open"],
        help="Wan prompt style: directed, balanced, or open",
    )

    # Seedance R2V (client-project video reference mode)
    parser.add_argument(
        "--seedance-refs",
        help="Comma-separated asset IDs for Seedance R2V reference images "
        "(e.g., DRIVER,DEER,BLUE_CAR,SUBURBAN_STREET). Order determines "
        "@Image1,@Image2,... tag numbering in the prompt. Resolves against "
        "{project}/refs/{characters|props|locations}/{id}/.",
    )
    parser.add_argument(
        "--ref-video",
        help="Reference video path(s) for Seedance R2V. Comma-separated for "
        "multiple clips (max 3 per fal.ai schema). Tagged @Video1,@Video2,... "
        "in the prompt. Triggers R2V dispatch via StepRunner.execute_pass().",
    )
    parser.add_argument(
        "--audio-url",
        help="Reference audio path(s) for Seedance R2V native audio channel. "
        "Comma-separated for multiple tracks (max 3 per fal.ai schema). "
        "Tagged @Audio1,@Audio2,... in the prompt. Used for voiceover / "
        "music / SFX. PREFERRED over bundling audio inside --ref-video — "
        "see pipeline-learnings §17c.",
    )
    parser.add_argument(
        "--pass-id",
        help="Pass identifier for Seedance R2V output filename "
        "(e.g., REGEN_P04). Output saved as {pass_id}_take{N}.mp4.",
    )
    parser.add_argument(
        "--aspect-ratio",
        default=None,
        help="Output aspect ratio. If unset, read from the project's "
        "project_config.json `aspect_ratio` field. Falls back to 9:16 "
        "(vertical microdrama default) if neither is set. "
        "Pass explicitly to override project_config.",
    )
    parser.add_argument(
        "--resolution",
        help="Output resolution (480p/720p/1080p). Defaults per model profile.",
    )
    parser.add_argument(
        "--no-audio",
        action="store_true",
        help="Disable generated audio (Seedance R2V only).",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Print final payload without submitting. Use before burning billing.",
    )
    parser.add_argument(
        "--shot-canonical",
        help="Shot ID for canonical-asset lookup (loads "
        "projects/{project}/state/visual/shots/{id}.canonical.json). "
        "Validator warns if dispatch element IDs differ from canonical.",
    )
    parser.add_argument(
        "--image-refs",
        help="Comma-separated asset IDs for Kling O3 R2V image_urls "
        "(@Image1/2/3 tokens — style/composition/start-frame channel, "
        "distinct from --elements identity channel). Supports view-tagged "
        "form: JADE:three_quarter,WREN:profile. Resolves via "
        "_resolve_client_view(). Shared 4-slot budget with elements on "
        "Kling O3 R2V. Mutually exclusive with --image-paths.",
    )
    parser.add_argument(
        "--image-paths",
        dest="image_paths",
        default=None,
        help=(
            "Comma-separated absolute or project-relative paths to image refs. "
            "Mutually exclusive with --image-refs."
        ),
    )
    parser.add_argument(
        "--force",
        action="store_true",
        help="Downgrade validator warnings to logged bypasses. Errors still block.",
    )
    parser.add_argument(
        "--skip-validator",
        action="store_true",
        help="Skip pre-dispatch validation entirely. Reserve for debugging.",
    )
    parser.add_argument(
        "--no-validate-frames",
        action="store_true",
        help="Disable StartFrameCritic and VideoFrameCritic inside StepRunner. "
        "Use when GEMINI_API_KEY is unavailable or for known-good frames.",
    )
    parser.add_argument(
        "--v2v-endpoint",
        choices=sorted(_VALID_V2V_ENDPOINTS),
        help="Kling video-to-video edit endpoint. When set with --ref-video and "
        "--model kling-o3, routes through Kling V2V edit (replaces probe_kling_v2v_edit.py).",
    )
    parser.add_argument(
        "--source-video",
        help="Source video path for Seedance V2V edit. Becomes element 0 of "
        "`reference_videos` (max 3 total). Triggers Seedance V2V mode when "
        "paired with --model seeddance-2.0 (no --shot/--shots/--start-frame). "
        "Distinct from --ref-video which adds OPTIONAL extra reference videos.",
    )
    parser.add_argument(
        "--tier",
        choices=["standard", "fast"],
        default="standard",
        help="Seedance V2V tier. 'standard' → bytedance/seedance-2.0/"
        "reference-to-video ($0.3034/s). 'fast' → bytedance/seedance-2.0/"
        "fast/reference-to-video ($0.2419/s). Default: standard.",
    )
    parser.add_argument(
        "--keep-audio",
        action="store_true",
        help="Preserve source video's audio track in V2V edit output. "
        "Default off — most edits replace the visual look only.",
    )
    parser.add_argument(
        "--segments",
        nargs="+",
        metavar="PROMPT",
        help="Multiple angle/beat prompts for Seedance multi-shot. "
        "Each value is one segment prompt; durations split evenly across "
        "--duration. Dispatches ONE Seedance job with a combined "
        "[Xs-Ys] timestamped prompt. Example: "
        "--segments 'driver shakes head' 'exterior tight on face' "
        "'bystander low angle' --duration 6",
    )

    args = parser.parse_args()

    # Resolve effective aspect_ratio: CLI flag > project_config > 9:16 fallback.
    # project_config.json is the single source of truth per project (driver-beware
    # declares 16:9, narrative projects declare 9:16). A later validator check
    # warns when the resolved AR differs from project_config — catches silent drift.
    args._aspect_ratio_source = "cli" if args.aspect_ratio else None
    args._project_config_aspect_ratio = None
    args._project_config = None
    if args.project:
        _pc_path = projects_root() / args.project / "project_config.json"
        if _pc_path.exists():
            try:
                with _pc_path.open() as _f:
                    _pc = json.load(_f)
                args._project_config_aspect_ratio = _pc.get("aspect_ratio")
                args._project_config = _pc
            except Exception as _e:
                print(f"WARN: failed to read {_pc_path}: {_e}")
    if args.aspect_ratio is None:
        if args._project_config_aspect_ratio:
            # project_config may store underscore format ("9_16"); normalize to colon
            args.aspect_ratio = args._project_config_aspect_ratio.replace("_", ":")
            args._aspect_ratio_source = "project_config"
            print(
                f"Aspect ratio: {args.aspect_ratio} (from {args.project}/project_config.json)"
            )
        else:
            args.aspect_ratio = "9:16"
            args._aspect_ratio_source = "fallback_default"
            print(
                f"Aspect ratio: {args.aspect_ratio} (fallback default — no project_config.aspect_ratio)"
            )

    # Seedance R2V mode is self-contained — doesn't require --shot/--shots.
    # Note: --seedance-refs alone does NOT imply R2V — _dispatch_seedance_i2v
    # also supports image refs via --seedance-refs. Only treat seedance_refs
    # as an R2V signal when --start-frame is absent (otherwise the user is
    # asking for I2V-with-refs, which the I2V dispatcher handles).
    _is_seedance_r2v = ("seed" in (args.model or "").lower()) and (
        args.ref_video
        or args.audio_url
        or (args.seedance_refs and not args.start_frame)
    )

    # I2V standalone mode — start-frame + optional end-frame, no shot (seedance or kling)
    _model_lower = (args.model or "").lower()

    # No dedicated Seedance V2V endpoint — routes through reference-to-video
    # with the source video as one of `reference_videos`.
    _is_seedance_v2v_edit_standalone = (
        "seed" in _model_lower
        and bool(args.source_video)
        and not args.shot
        and not args.shots
    )

    # Kling V2V edit standalone — source video + image refs + endpoint, no shot.
    # Replaces probe_kling_v2v_edit.py. Triggered by kling-o3 + --ref-video +
    # --v2v-endpoint, regardless of --image-refs (refs are optional for some endpoints).
    _is_kling_v2v_edit_standalone = (
        "kling-o3" in _model_lower
        and bool(args.ref_video)
        and bool(args.v2v_endpoint)
        and not args.shot
        and not args.shots
    )

    # Kling O3 Elements standalone — elements + optional start-frame, no shot.
    # Start frame is OPTIONAL here: on Kling O3 R2V, elements + start_image_url
    # fight architecturally and the validator will block if both are sent.
    _is_kling_o3_elements_standalone = (
        "kling-o3" in _model_lower
        and bool(args.elements)
        and not args.shot
        and not args.shots
    )

    _is_seedance_i2v_standalone = (
        ("seed" in _model_lower or ("kling" in _model_lower and not args.elements))
        and (bool(args.start_frame) or bool(getattr(args, "segments", None)))
        and not args.shot
        and not args.shots
        and not _is_seedance_r2v
        and not _is_kling_o3_elements_standalone
        and not _is_kling_v2v_edit_standalone
        and not _is_seedance_v2v_edit_standalone
    )

    if (
        not args.shot
        and not args.shots
        and not _is_seedance_r2v
        and not _is_seedance_i2v_standalone
        and not _is_kling_o3_elements_standalone
        and not _is_kling_v2v_edit_standalone
        and not _is_seedance_v2v_edit_standalone
    ):
        parser.error(
            "Must provide --shot or --shots (or --seedance-refs/--ref-video for R2V, or --start-frame for I2V standalone, or --elements+--start-frame for Kling O3 Elements standalone, or --ref-video+--v2v-endpoint for Kling V2V edit, or --source-video for Seedance V2V edit)"
        )

    # ── Seedance V2V edit dispatch (source video → reference-to-video) ──
    if _is_seedance_v2v_edit_standalone:
        store = get_store(args.project)
        rc = _dispatch_seedance_v2v_edit(args, store)
        sys.exit(rc)

    # ── Kling V2V edit dispatch (replaces probe_kling_v2v_edit.py) ──
    if _is_kling_v2v_edit_standalone:
        store = get_store(args.project)
        rc = _dispatch_kling_v2v_edit(args, store)
        sys.exit(rc)

    # ── Seedance R2V dispatch (self-contained, client-project aware) ──
    if _is_seedance_r2v:
        store = get_store(args.project)
        rc = _dispatch_seedance_r2v(args, store)
        sys.exit(rc)

    # ── Kling O3 Elements standalone dispatch (client-project ad-hoc) ──
    if _is_kling_o3_elements_standalone:
        store = get_store(args.project)
        rc = _dispatch_kling_o3_elements(args, store)
        sys.exit(rc)

    # ── Seedance I2V standalone dispatch ──
    if _is_seedance_i2v_standalone:
        store = get_store(args.project)
        rc = _dispatch_seedance_i2v(args, store)
        sys.exit(rc)

    # Elements require fal.ai model
    elements_payload = None
    element_char_ids = []
    has_location_element = False
    total_elements = 0

    # Pre-load plan shots for location extraction (needed before build_elements)
    pre_loaded_plan_shots = None
    if args.elements:
        if args.model != "kling-o3":
            print(
                f"NOTE: Elements require O3 — switching model from {args.model} to kling-o3"
            )
            args.model = "kling-o3"

        # Load plan shots so we can extract location for elements
        if args.shots:
            shot_ids = [s.strip() for s in args.shots.split(",")]
            pre_loaded_plan_shots = []
            for sid in shot_ids:
                sd, _, _ = get_plan_shot(args.project, sid, args.plan_dir)
                pre_loaded_plan_shots.append(sd)
        elif args.shot:
            sd, _, _ = get_plan_shot(args.project, args.shot, args.plan_dir)
            pre_loaded_plan_shots = [sd]

        elements_payload, element_char_ids, has_location_element, total_elements = (
            build_elements(
                args.elements, args.project, plan_shots=pre_loaded_plan_shots
            )
        )

    # Veo reference images (character turnaround refs → Ingredients to Video)
    veo_ref_paths = []
    if args.veo_refs:
        char_ids = [c.strip().upper() for c in args.veo_refs.split(",")]
        refs_root = projects_root() / args.project / "output" / "refs" / "characters"
        for cid in char_ids:
            char_dir = refs_root / cid.lower()
            if not char_dir.exists():
                print(f"WARNING: No refs dir for {cid} at {char_dir}")
                continue
            # Pick best 3: front, three_quarter, profile (or hero fallback)
            for name in [
                f"{cid.lower()}_front.png",
                f"{cid.lower()}_three_quarter.png",
                f"{cid.lower()}_profile.png",
                "hero.jpeg",
                "hero.png",
            ]:
                p = char_dir / name
                if p.exists() and len(veo_ref_paths) < 3:
                    veo_ref_paths.append(str(p))
        if veo_ref_paths:
            print(
                f"Veo refs ({len(veo_ref_paths)}): {[Path(p).name for p in veo_ref_paths]}"
            )
        else:
            print("WARNING: No Veo ref images found — proceeding without references")

    store = get_store(args.project)

    from recoil.execution.step_runner import StepRunner
    from recoil.execution.step_types import ProjectPaths

    if args.shots:
        # ── Multi-shot sequence ──
        shot_ids = [s.strip() for s in args.shots.split(",") if s.strip()]
        print(f"Sequence: {shot_ids}")
        print(f"Model: {args.model}")

        import re

        ep_match = re.match(r"EP(\d+)", shot_ids[0])
        ep_num = int(ep_match.group(1)) if ep_match else 1
        paths = ProjectPaths.for_episode(args.project, ep_num)
        runner = StepRunner(
            store=store,
            paths=paths,
            validate_frames=not args.no_validate_frames,
            episode=ep_num,
        )

        # Load plan shots
        batch = []
        plan_data_for_batch: dict = {}
        for sid in shot_ids:
            shot_data, _, _plan = get_plan_shot(args.project, sid, args.plan_dir)
            shot_data["_api_duration"] = max(
                3,
                shot_data.get("routing_data", {}).get("target_editorial_duration_s", 5),
            )
            batch.append(shot_data)
            if not plan_data_for_batch and isinstance(_plan, dict):
                plan_data_for_batch = _plan

        # ── A5 routing fix (R4) — per pipeline-learnings §26, multi-shot is the
        # narrative default for --shots. Old behaviour fell silently through
        # to per-shot dispatch, which lost SH17/SH18 of every batch with
        # `[FAIL] -> None ($0.00)`. New rule:
        #   --shots without --per-shot       -> r2v_multi dispatch (one call)
        #   --shots --per-shot               -> legacy per-shot loop (below)
        if not args.per_shot:
            from recoil.pipeline._lib.prompt_engine import get_builder

            try:
                multi_builder = get_builder(args.model, "r2v_multi")
            except KeyError:
                multi_builder = None
            if multi_builder is None:
                print(
                    f"ERROR: model {args.model!r} has no r2v_multi builder; "
                    f"either select a model that supports r2v_multi (seeddance-2.0) "
                    f"OR pass --per-shot to dispatch each shot individually."
                )
                sys.exit(1)

            bible = (
                plan_data_for_batch.get("bible_stub", {}) if plan_data_for_batch else {}
            )
            config = (
                args._project_config
                if args._project_config is not None
                else {"film_stock": "Kodak Vision3 500T"}
            )
            multi_prompt = multi_builder(
                shots=batch,
                bible=bible,
                project_config=config,
                episode=ep_num,
            )
            if args.dry_run:
                print(
                    f"r2v_multi prompt ({len(multi_prompt.split())} words, "
                    f"{len(multi_prompt)} chars):"
                )
                print(multi_prompt)
            else:
                print(
                    f"r2v_multi prompt ({len(multi_prompt.split())} words): "
                    f"{multi_prompt[:120]}..."
                )

            # Collect reference images across the batch — one merged list, the
            # r2v_multi runner indexes them as @Image1..N.
            reference_images: list[str] = []
            for sd in batch:
                for rp in sd.get("reference_images") or []:
                    if rp not in reference_images:
                        reference_images.append(rp)

            # Auto-resolve refs when plan shots don't have pre-populated
            # reference_images. Check CLI flags first, then shot metadata.
            if not reference_images:
                # Try CLI flags: --seedance-refs or --image-refs
                ref_arg = args.seedance_refs or (
                    args.image_refs
                    if hasattr(args, "image_refs") and args.image_refs
                    else None
                )
                if ref_arg:
                    for raw in ref_arg.split(","):
                        aid = raw.strip().split(":")[0]
                        if not aid:
                            continue
                        p = _resolve_client_ref(args.project, aid)
                        if p is not None and str(p) not in reference_images:
                            reference_images.append(str(p))
                            print(
                                f"  @Image{len(reference_images)} = {aid} -> {p.name}"
                            )

            if not reference_images:
                # Last resort: resolve from shot prompt_data characters + location
                seen_ids: set[str] = set()
                for sd in batch:
                    pd = sd.get("prompt_data", {})
                    for ch in pd.get("characters", []):
                        cid = ch.get("char_id", "")
                        if cid and cid not in seen_ids:
                            p = _resolve_client_ref(args.project, cid)
                            if p is not None:
                                reference_images.append(str(p))
                                seen_ids.add(cid)
                                print(
                                    f"  @Image{len(reference_images)} = {cid} (auto) -> {p.name}"
                                )
                    loc = pd.get("location_id", "")
                    if loc and loc not in seen_ids:
                        p = _resolve_client_ref(args.project, loc)
                        if p is not None:
                            reference_images.append(str(p))
                            seen_ids.add(loc)
                            print(
                                f"  @Image{len(reference_images)} = {loc} (auto) -> {p.name}"
                            )

            if not reference_images:
                print(
                    "WARNING: No reference images found — generation will be text-to-video"
                )

            ctx = DispatchContext(
                caller_id="dispatch_cli",
                step_runner=runner,
                project=args.project,
                episode=ep_num,
            )
            # Converge through build_unified_payload (payload_assembly review
            # follow-up, 2026-05-25): coerce raw batch dicts to CanonicalShot,
            # build the unified payload (gives us aspect_ratio + inputs_snapshot
            # + gate_results + prompt_layers + provider_hints.segment_count),
            # then overlay the legacy top-level shot_ids key used by downstream
            # sidecar/passstore naming. Prompt is pre-built above
            # via multi_builder; pass it through ctx.prompt so the assembler
            # skips its own builder invocation.
            from recoil.pipeline._lib.dispatch_payload import _coerce_canonical_shot

            batch_canon = [_coerce_canonical_shot(sd) for sd in batch]
            r2v_ctx = PayloadContext(
                project=args.project,
                modality="r2v_multi",
                shot_id=shot_ids[0],
                prompt=multi_prompt,
                model_id=args.model,
                duration_s=float(sum(s.get("_api_duration", 0) for s in batch)),
                aspect_ratio=args.aspect_ratio,
                generate_audio=args.generate_audio,
                negative_prompt=args.negative_prompt,
                reference_image_paths=[Path(p) for p in reference_images]
                if reference_images
                else None,
                batch_shots=batch_canon,
                episode=str(ep_num),
            )
            r2v_payload = build_unified_payload(r2v_ctx)
            r2v_payload["shot_ids"] = shot_ids
            if args.dry_run:
                return _dry_run_dump(
                    f"r2v_multi {args.model} shots={shot_ids}",
                    r2v_payload,
                    prompt=multi_prompt,
                )
            t0 = time.time()
            receipt = dispatch("r2v_multi", r2v_payload, context=ctx)
            result = receipt.run_result
            elapsed = time.time() - t0
            cost_usd = read_cost_from_result(result)

            # A5/A4 — every batch output gets a sidecar via Phase 3's helper.
            if result.success and result.output_path:
                try:
                    from recoil.pipeline._lib.sidecar import (
                        populate_sidecar,
                        write_sidecar_dict,
                    )

                    output_path = Path(str(result.output_path))
                    sidecar_path = output_path.with_suffix(output_path.suffix + ".json")
                    tag = _derive_multi_shot_tag(batch)
                    # R6 Phase 6 — plumb new params: pipeline locked to
                    # "r2v_multi" (matches dispatch + workspace API string).
                    # shot_id derived from first member of the batch.
                    _r2v_shot_id = batch[0].get("shot_id") if batch else None
                    _r2v_dispatch_path = (
                        getattr(ctx.step_runner, "_dispatch_path", None)
                        if "ctx" in locals()
                        else None
                    )
                    sidecar_dict = populate_sidecar(
                        receipt=receipt,
                        payload={
                            "model": args.model,
                            "modality": "video_i2v",
                            "duration": r2v_payload["duration"],
                            "prompt": multi_prompt,
                            "reference_images": reference_images,
                            "video_path": str(output_path),
                        },
                        refs_used=reference_images,
                        tag=tag,
                        project=args.project,
                        pipeline="r2v_multi",
                        dispatch_path=_r2v_dispatch_path or "unknown",
                        shot_id=_r2v_shot_id,
                        generation_params={
                            "duration": r2v_payload["duration"],
                            "aspect_ratio": args.aspect_ratio,
                            "batch_shot_ids": shot_ids,
                        },
                    )
                    write_sidecar_dict(sidecar_path, sidecar_dict)
                    print(f"Sidecar: {sidecar_path}")
                    # R5 SYNTHESIS §1.8 — post-fire inspect-sidecars.
                    # Non-blocking; surfaces prod-vs-synthetic drift.
                    _post_fire_inspect_sidecar(args, sidecar_path)
                except Exception as _sc_err:
                    print(f"WARNING: sidecar write failed: {_sc_err}")

            status = "OK" if result.success else "FAIL"
            print(
                f"[{status}] r2v_multi {shot_ids} -> {result.output_path} "
                f"(${cost_usd:.2f})"
            )
            if result.error:
                print(f"Error: {result.error}")
            print(f"Done in {elapsed:.0f}s")
            return

        # Per-shot path: --per-shot flag set. The legacy execute_multi_shot
        # call below falls through to _execute_sequential_shots for any model
        # whose api_pattern isn't kling_rest — that's the per-shot path the
        # spec keeps for surgical retries / continuity overrides. Sidecars
        # now ride on the success branch below so A4 doesn't regress.
        from recoil.pipeline._lib.prompt_engine import build_multi_prompt_sequence

        sequence = build_multi_prompt_sequence(
            batch,
            batch_char_ids=element_char_ids if elements_payload else None,
            has_location_element=has_location_element,
            total_elements=total_elements,
        )

        start_frame = find_hero_frame(args.project, shot_ids[0], store)
        print(f"Start frame: {start_frame}")
        print(f"Shots: {len(batch)}, Total: {sum(s['_api_duration'] for s in batch)}s")

        if args.dry_run:
            return _dry_run_dump(
                f"multi_shot {args.model} shots={shot_ids}",
                {
                    "batch_size": len(batch),
                    "model": args.model,
                    "start_frame": str(start_frame) if start_frame else None,
                    "elements_payload": elements_payload,
                    "multi_prompt_sequence": sequence,
                },
            )
        t0 = time.time()
        results = runner.execute_multi_shot(
            batch=batch,
            multi_prompt_sequence=sequence,
            model=args.model,
            start_frame=start_frame,
            elements_payload=elements_payload,
        )
        elapsed = time.time() - t0

        # Phase 9 — register test dispatch (Multi-shot sequence path)
        first_ok = next((r for r in results if r.success), None)
        if first_ok is not None:
            _total_cost = float(sum((r.cost_usd or 0.0) for r in results))
            _combined_prompt = "\n".join(
                f"[{seg.get('index', '?')}] {seg.get('prompt', '')}" for seg in sequence
            )
            _register_test_dispatch_with_passstore(
                project=args.project,
                original_pass_id=shot_ids[0],
                segment_shot_ids=shot_ids,
                model=args.model,
                prompt=_combined_prompt,
                output_path=str(first_ok.output_path) if first_ok.output_path else None,
                cost_usd=_total_cost,
                latency_seconds=elapsed,
                start_frame=str(start_frame) if start_frame else None,
                refs=element_char_ids if elements_payload else [],
                duration=int(sum(s.get("_api_duration", 0) for s in batch)),
                aspect_ratio=args.aspect_ratio,
            )

        # A4 sidecar — per-shot fallback (--per-shot) writes a sidecar per
        # successful StepResult so this path doesn't regress A4. The single-
        # shot success block (below the elif chain) writes one sidecar via
        # populate_sidecar; mirror that contract here, one per result.
        try:
            from recoil.pipeline._lib.sidecar import (
                populate_sidecar,
                write_sidecar_dict,
            )
        except Exception:  # noqa: BLE001
            populate_sidecar = None  # type: ignore[assignment]
            write_sidecar_dict = None  # type: ignore[assignment]

        _shot_by_id = {s.get("shot_id"): s for s in batch}
        _seq_by_id = {
            s.get("shot_id"): seg for s, seg in zip(batch, sequence, strict=False)
        }

        for r in results:
            status = "OK" if r.success else "FAIL"
            print(f"  [{status}] {r.shot_id} → {r.output_path} (${r.cost_usd:.2f})")
            if (
                r.success
                and r.output_path
                and populate_sidecar is not None
                and write_sidecar_dict is not None
            ):
                try:
                    _shot = _shot_by_id.get(r.shot_id, {})
                    _seg = _seq_by_id.get(r.shot_id, {})
                    # Phase 6.5 leak fix — the per-shot fallback was passing
                    # `refs_used=[str(start_frame)]`, dropping the element
                    # ref list every shot in the batch resolved. Union the
                    # shot's reference_images with the start_frame so the
                    # sidecar reflects the actual dispatched payload.
                    _per_shot_refs: list[str] = []
                    for _r in _shot.get("reference_images") or []:
                        _r_s = str(_r)
                        if _r_s not in _per_shot_refs:
                            _per_shot_refs.append(_r_s)
                    if start_frame is not None:
                        _sf_s = str(start_frame)
                        if _sf_s not in _per_shot_refs:
                            _per_shot_refs.append(_sf_s)
                    output_path = Path(str(r.output_path))
                    sidecar_path = output_path.with_suffix(output_path.suffix + ".json")
                    # R6 Phase 6 — plumb new params: pipeline locked to
                    # "r2v_multi_fallback" to differentiate from the BATCH
                    # site; downstream tooling can filter.
                    #
                    # Spec-review M2 (Opus): the per-shot fallback path at
                    # this site uses `runner.execute_multi_shot(...)` directly
                    # — no DispatchContext, so `ctx` is NEVER in scope here.
                    # The `"ctx" in locals()` guard always falls back to
                    # `None`, and `dispatch_path` resolves to `"unknown"`.
                    # Acceptable: this branch is dev-only retry, not a primary
                    # production surface.
                    _fb_dispatch_path = (
                        getattr(ctx.step_runner, "_dispatch_path", None)
                        if "ctx" in locals()
                        else None
                    )
                    sidecar_dict = populate_sidecar(
                        receipt=None,
                        payload={
                            "model": args.model,
                            "modality": "video_i2v",
                            "duration": _shot.get("_api_duration"),
                            "prompt": _seg.get("prompt"),
                            "reference_images": _per_shot_refs,
                            "video_path": str(output_path),
                            "cost_usd": float(getattr(r, "cost_usd", 0.0) or 0.0),
                        },
                        refs_used=_per_shot_refs,
                        tag=_derive_single_shot_tag(_shot),
                        project=args.project,
                        pipeline="r2v_multi_fallback",
                        dispatch_path=_fb_dispatch_path or "unknown",
                        shot_id=r.shot_id,
                        generation_params={
                            "duration": _shot.get("_api_duration"),
                            "aspect_ratio": args.aspect_ratio,
                        },
                    )
                    write_sidecar_dict(sidecar_path, sidecar_dict)
                    # R5 SYNTHESIS §1.8 — post-fire inspect-sidecars
                    # (per-shot fallback). Non-blocking; surfaces drift.
                    _post_fire_inspect_sidecar(args, sidecar_path)
                except Exception as _sc_err:
                    print(f"  WARNING: sidecar write failed for {r.shot_id}: {_sc_err}")
        print(f"Done in {elapsed:.0f}s, ${sum(r.cost_usd for r in results):.2f} total")

    elif args.mode == "coverage":
        # ── Coverage ──
        shot_data, ep_num, plan_data = get_plan_shot(
            args.project, args.shot, args.plan_dir
        )
        print(f"Coverage: {args.shot} (WS/MS/CU)")

        from recoil.pipeline._lib.prompt_engine import build_coverage_prompts

        has_audio = args.model in ("kling-v3", "kling-o3", "seeddance-2.0")
        prompts = build_coverage_prompts(shot_data, include_audio_cues=has_audio)
        for p in prompts:
            print(f"  {p['framing']}: {p['prompt'][:60]}...")

        paths = ProjectPaths.for_episode(args.project, ep_num)
        runner = StepRunner(
            store=store,
            paths=paths,
            validate_frames=not args.no_validate_frames,
            episode=ep_num,
        )

        # Build batch (same shot 3x) + sequence
        batch = []
        sequence = []
        for i, p in enumerate(prompts):
            shot_copy = dict(shot_data)
            shot_copy["_api_duration"] = p["duration"]
            batch.append(shot_copy)
            sequence.append(
                {"index": i + 1, "prompt": p["prompt"], "duration": p["duration"]}
            )

        # Inject @Element refs if elements are active
        if elements_payload and element_char_ids:
            from recoil.pipeline._lib.elements import ElementManager

            for entry in sequence:
                entry["prompt"] = ElementManager.inject_element_refs(
                    entry["prompt"],
                    element_char_ids,
                    element_char_ids,
                    has_location_element=has_location_element,
                    total_elements=total_elements,
                )

        start_frame = find_hero_frame(args.project, args.shot, store)
        print(f"Start frame: {start_frame}")

        if args.dry_run:
            return _dry_run_dump(
                f"multi_shot coverage {args.model} shot={args.shot}",
                {
                    "batch_size": len(batch),
                    "model": args.model,
                    "start_frame": str(start_frame) if start_frame else None,
                    "elements_payload": elements_payload,
                    "multi_prompt_sequence": sequence,
                },
            )
        t0 = time.time()
        results = runner.execute_multi_shot(
            batch=batch,
            multi_prompt_sequence=sequence,
            model=args.model,
            start_frame=start_frame,
            elements_payload=elements_payload,
        )
        elapsed = time.time() - t0

        # Phase 9 — register test dispatch (Coverage path)
        first_ok = next((r for r in results if r.success), None)
        if first_ok is not None:
            _total_cost = float(sum((r.cost_usd or 0.0) for r in results))
            _combined_prompt = "\n".join(
                f"[{p.get('framing', '')}] {p.get('prompt', '')}" for p in prompts
            )
            _register_test_dispatch_with_passstore(
                project=args.project,
                original_pass_id=args.shot,
                segment_shot_ids=[args.shot],
                model=args.model,
                prompt=_combined_prompt,
                output_path=str(first_ok.output_path) if first_ok.output_path else None,
                cost_usd=_total_cost,
                latency_seconds=elapsed,
                start_frame=str(start_frame) if start_frame else None,
                refs=element_char_ids if elements_payload else [],
                duration=int(sum(p.get("duration", 0) for p in prompts)),
                aspect_ratio=args.aspect_ratio,
            )

        for r in results:
            status = "OK" if r.success else "FAIL"
            print(f"  [{status}] {r.shot_id} → {r.output_path} (${r.cost_usd:.2f})")
        print(f"Done in {elapsed:.0f}s, ${sum(r.cost_usd for r in results):.2f} total")

    else:
        # ── Standard or Action single-shot ──
        shot_data, ep_num, plan_data = get_plan_shot(
            args.project, args.shot, args.plan_dir
        )
        duration = 10 if args.mode == "action" else args.duration
        print(f"{args.mode.title()}: {args.shot} ({duration}s)")
        print(f"Model: {args.model}")

        # R4 planner-data plumbing — stamp start_frame_path into shot_data.routing_data
        # before the strict builder reads it. Source order: --start-frame CLI override,
        # then find_hero_frame() from the shot's previz output. Planners don't yet emit
        # start_frame_path on the plan side; this dispatch-time stamping is the bridge.
        single_shot_start_frame: Optional[Path] = None
        if args.start_frame:
            single_shot_start_frame = Path(args.start_frame)
            if not single_shot_start_frame.exists():
                print(f"ERROR: --start-frame not found: {single_shot_start_frame}")
                sys.exit(1)
        else:
            try:
                single_shot_start_frame = find_hero_frame(
                    args.project, args.shot, store
                )
            except Exception:
                single_shot_start_frame = None
        if single_shot_start_frame is not None:
            shot_data.setdefault("routing_data", {})
            shot_data["routing_data"]["start_frame_path"] = str(single_shot_start_frame)
            print(f"Start frame: {single_shot_start_frame.name}")

        paths = ProjectPaths.for_episode(args.project, ep_num)
        runner = StepRunner(
            store=store,
            paths=paths,
            validate_frames=not args.no_validate_frames,
            episode=ep_num,
        )

        if args.prompt:
            prompt = args.prompt
        else:
            # Model-specific prompt selection
            if "wan" in args.model and "r2v" not in args.model:
                # Wan I2V — use wan_i2v or wan_between prompt
                from recoil.pipeline._lib.prompt_engine import build_wan_i2v_prompt

                has_end = args.end_frame is not None
                prompt = build_wan_i2v_prompt(
                    shot_data,
                    has_end_frame=has_end,
                    prompt_style=args.prompt_style,
                )
                print(
                    f"Prompt source: wan_{'between' if has_end else 'i2v'} (style={args.prompt_style})"
                )
            elif "veo" in args.model:
                from recoil.pipeline._lib.prompt_engine import (
                    build_video_prompt_from_plan,
                )

                bible = plan_data.get("bible_stub", {})
                config = (
                    args._project_config
                    if args._project_config is not None
                    else {"film_stock": "Kodak Vision3 500T"}
                )
                prompt = build_video_prompt_from_plan(
                    shot=shot_data,
                    bible=bible,
                    project_config=config,
                    episode=ep_num,
                )
                print("Prompt source: veo_t2v")
            elif "seeddance" in args.model or "seedance" in args.model:
                # A1 leak fix (R4) — route through the registry; do NOT fall
                # through to build_kling_i2v_prompt on ValueError. The
                # "no start frame" message was also a lie (frames existed).
                # If a seeddance single-shot lacks a start frame, that's a
                # planner bug worth raising on.
                from recoil.pipeline._lib.prompt_engine import get_builder

                bible = plan_data.get("bible_stub", {})
                config = (
                    args._project_config
                    if args._project_config is not None
                    else {"film_stock": "Kodak Vision3 500T"}
                )
                builder = get_builder(args.model, "i2v")
                if builder is None:
                    print(f"ERROR: no i2v builder registered for model {args.model!r}")
                    sys.exit(1)
                try:
                    prompt = builder(
                        shot=shot_data,
                        bible=bible,
                        project_config=config,
                        episode=ep_num,
                    )
                    print(f"Prompt source: {args.model} i2v (registry)")
                except ValueError as e:
                    print(
                        f"ERROR: seeddance i2v builder rejected shot {args.shot}: {e}"
                    )
                    sys.exit(1)
            else:
                from recoil.pipeline._lib.prompt_engine import build_kling_i2v_prompt

                has_audio = args.model in ("kling-v3", "kling-o3")
                prompt = build_kling_i2v_prompt(shot_data, include_audio_cues=has_audio)
                print("Prompt source: kling_i2v")

        # Inject @Element refs if elements are active
        if elements_payload and element_char_ids and not args.prompt:
            from recoil.pipeline._lib.elements import ElementManager

            prompt = ElementManager.inject_element_refs(
                prompt,
                element_char_ids,
                element_char_ids,
                has_location_element=has_location_element,
                total_elements=total_elements,
            )

        if args.dry_run:
            print(f"Prompt ({len(prompt.split())} words, {len(prompt)} chars):")
            print(prompt)
        else:
            print(f"Prompt ({len(prompt.split())} words): {prompt[:120]}...")

        # Start frame: explicit flag > hero frame lookup
        if args.start_frame:
            start_frame = Path(args.start_frame)
            if not start_frame.exists():
                print(f"ERROR: --start-frame not found: {start_frame}")
                sys.exit(1)
        else:
            start_frame = find_hero_frame(args.project, args.shot, store)
        print(f"Start frame: {start_frame}")

        # End frame (In Between / sandwich)
        end_frame = None
        if args.end_frame:
            end_frame = Path(args.end_frame)
            if not end_frame.exists():
                print(f"ERROR: --end-frame not found: {end_frame}")
                sys.exit(1)
            print(f"End frame: {end_frame}")

        if args.negative_prompt:
            if args.dry_run:
                print(f"Negative prompt ({len(args.negative_prompt)} chars):")
                print(args.negative_prompt)
            else:
                print(f"Negative prompt: {args.negative_prompt[:60]}...")
        elif "wan" in args.model:
            # Default Wan negative prompt
            args.negative_prompt = (
                "morphing, melting, teleporting, fast cuts, text, watermark"
            )
            print(f"Negative prompt (wan default): {args.negative_prompt}")

        if args.generate_audio:
            print("Audio generation: enabled")

        # Wan / HappyHorse R2V path — different execution method.
        # WanAdapter hosts both; routing is by model_id inside the adapter.
        if ("wan" in args.model or "happy-horse" in args.model) and "r2v" in args.model:
            if not args.wan_refs:
                print(
                    "ERROR: --wan-refs required for wan-2.7-r2v (e.g., --wan-refs SADIE)"
                )
                sys.exit(1)

            # Build R2V prompt from plan data
            if not args.prompt:
                from recoil.pipeline._lib.prompt_engine import build_wan_r2v_prompt

                bible = plan_data.get("bible_stub", {})
                config = (
                    args._project_config
                    if args._project_config is not None
                    else {"film_stock": "Kodak Vision3 500T"}
                )
                prompt = build_wan_r2v_prompt(
                    [shot_data],
                    bible,
                    config,
                    episode=ep_num,
                    multi_shots=False,
                )
                if args.dry_run:
                    print(
                        f"R2V prompt ({len(prompt.split())} words, "
                        f"{len(prompt)} chars):"
                    )
                    print(prompt)
                else:
                    print(
                        f"R2V prompt ({len(prompt.split())} words): {prompt[:120]}..."
                    )

            # Collect ref image paths via the shared CLI-edge resolver.
            # payload_assembly convergence Phase 5 (2026-05-25): inline
            # filename scanning replaced with _resolve_wan_character_refs.
            # Behavior preserved: frontal + three-quarter per character;
            # falls back to client-project layouts via _resolve_client_frontal
            # / _resolve_client_three_quarter (which scan refs/characters/
            # assets/identity/ paths). Per SYNTHESIS Condition 4, this
            # path keeps the direct runner.execute_wan_r2v(...) call —
            # video_wan_r2v is not registered as a dispatch modality.
            ref_paths = _resolve_wan_character_refs(args.project, args.wan_refs)
            print(f"R2V refs ({len(ref_paths)}): {[p.name for p in ref_paths]}")

            if args.dry_run:
                return _dry_run_dump(
                    f"wan_r2v {args.model} shot={args.shot}",
                    {
                        "shot_id": args.shot,
                        "model": args.model,
                        "duration": duration,
                        "reference_image_paths": [str(p) for p in ref_paths],
                        "multi_shots": False,
                    },
                    prompt=prompt,
                )
            t0 = time.time()
            result = runner.execute_wan_r2v(
                shot_id=args.shot,
                prompt=prompt,
                reference_image_paths=ref_paths,
                model=args.model,
                duration=duration,
                multi_shots=False,
            )
            elapsed = time.time() - t0
        else:
            # Standard I2V / T2V path (Kling, Veo, Wan I2V) — converged through
            # build_unified_payload (payload_assembly review follow-up,
            # 2026-05-25). Pre-built prompt + already-validated start_frame go
            # via ctx; the assembler adds aspect_ratio, provider_hints, and
            # the inputs_snapshot / gate_results / prompt_layers telemetry.
            t0 = time.time()
            ctx = DispatchContext(
                caller_id="dispatch_cli",
                step_runner=runner,
                project=args.project,
                episode=_derive_episode_int(args.shot),
            )
            std_ctx = PayloadContext(
                project=args.project,
                modality="video_i2v",
                shot_id=args.shot,
                prompt=prompt,
                start_frame_path=Path(start_frame) if start_frame else None,
                end_frame_path=Path(end_frame) if end_frame else None,
                model_id=args.model,
                duration_s=float(duration),
                aspect_ratio=args.aspect_ratio,
                generate_audio=args.generate_audio,
                negative_prompt=args.negative_prompt,
                elements_payload=elements_payload,
                reference_image_paths=(
                    [Path(p) for p in veo_ref_paths] if veo_ref_paths else None
                ),
                episode=_derive_episode_id(args.shot),
            )
            std_payload = build_unified_payload(std_ctx)
            if args.dry_run:
                return _dry_run_dump(
                    f"per-shot {args.model} shot={args.shot}",
                    std_payload,
                    prompt=prompt,
                )
            receipt = dispatch("video_i2v", std_payload, context=ctx)
            result = receipt.run_result
            elapsed = time.time() - t0

        # cost_usd extraction handles both StepResult (wan_r2v branch — direct
        # attr) and RunResult (dispatch branch — metadata dict). The canonical
        # reader handles both shapes natively.
        cost_usd = read_cost_from_result(result)

        if result.success:
            # Phase 9 — register test dispatch (Standard or Action single-shot path)
            #
            # R6 Phase 8 (c1) leak fix:
            #   - UNCONDITIONALLY initialize _refs_for_record = [] (char-ID
            #     purge per Gemini R3 G1.3 catch — must happen before
            #     elements_payload branch).
            #   - When elements_payload["elements"] is present, populate
            #     _refs_for_record with the ACTUAL ref-image paths
            #     (frontal_image_url + reference_image_urls), NOT the
            #     char-ID strings.
            #   - Char-IDs ("JADE", "WREN") move to
            #     provenance.generation_params.element_char_ids per
            #     SYNTHESIS decision #6.
            _refs_for_record: list[str] = []
            _element_char_ids_for_provenance: list[str] = []
            if elements_payload and isinstance(elements_payload, dict):
                _elements = elements_payload.get("elements") or []
                for _el in _elements:
                    if not isinstance(_el, dict):
                        continue
                    _frontal = _el.get("frontal_image_url")
                    if _frontal:
                        _refs_for_record.append(str(_frontal))
                    for _r in _el.get("reference_image_urls") or []:
                        if _r:
                            _refs_for_record.append(str(_r))
                # Char-IDs preserved separately for audit/provenance only.
                _element_char_ids_for_provenance = list(element_char_ids or [])
            elif args.veo_refs:
                _refs_for_record = [
                    c.strip().upper() for c in args.veo_refs.split(",") if c.strip()
                ]
            elif args.wan_refs:
                _refs_for_record = [
                    c.strip().upper() for c in args.wan_refs.split(",") if c.strip()
                ]

            # A4 leak fix (R4) — write canonical sidecar on the single-shot
            # success path. Single-shot used to land with empty refs_used /
            # seed=null / gate_results={} / prompt_layers={}. Now both surfaces
            # (single-shot here, r2v_multi in step_runner.execute_pass) write
            # the same shape via populate_sidecar.
            if result.output_path:
                try:
                    from recoil.pipeline._lib.sidecar import (
                        populate_sidecar,
                        write_sidecar_dict,
                    )

                    output_path = Path(str(result.output_path))
                    sidecar_path = output_path.with_suffix(output_path.suffix + ".json")
                    sidecar_payload = {
                        "model": args.model,
                        "modality": "video_i2v",
                        "duration": duration,
                        "prompt": prompt,
                        "reference_images": (
                            [str(p) for p in (veo_ref_paths or [])] or _refs_for_record
                        ),
                        "video_path": str(output_path),
                    }
                    # R6 Phase 6 — plumb new params: pipeline branched on
                    # start_frame presence (i2v vs t2v). dispatch_path from
                    # ctx (CP-5 stamps it on step_runner).
                    _ss_pipeline = (
                        "video_i2v" if start_frame is not None else "video_t2v"
                    )
                    _ss_dispatch_path = (
                        getattr(ctx.step_runner, "_dispatch_path", None)
                        if "ctx" in locals()
                        else None
                    )
                    sidecar_dict = populate_sidecar(
                        receipt=receipt if "receipt" in locals() else None,
                        payload=sidecar_payload,
                        refs_used=_refs_for_record,
                        tag=_derive_single_shot_tag(shot_data),
                        project=args.project,
                        pipeline=_ss_pipeline,
                        dispatch_path=_ss_dispatch_path or "unknown",
                        shot_id=args.shot,
                        generation_params={
                            "duration": duration,
                            "aspect_ratio": args.aspect_ratio,
                            # R6 Phase 8 (c1) — char-IDs land here, NOT in refs_used.
                            "element_char_ids": _element_char_ids_for_provenance,
                        },
                    )
                    write_sidecar_dict(sidecar_path, sidecar_dict)
                    print(f"Sidecar: {sidecar_path}")
                    # R5 SYNTHESIS §1.8 — post-fire inspect-sidecars
                    # (single-shot path). Non-blocking; surfaces drift.
                    _post_fire_inspect_sidecar(args, sidecar_path)
                except Exception as _sc_err:
                    print(f"WARNING: sidecar write failed: {_sc_err}")

            _register_test_dispatch_with_passstore(
                project=args.project,
                original_pass_id=args.shot,
                segment_shot_ids=[args.shot],
                model=args.model,
                prompt=prompt,
                output_path=str(result.output_path) if result.output_path else None,
                cost_usd=cost_usd,
                latency_seconds=elapsed,
                start_frame=str(start_frame) if start_frame else None,
                # R6 Phase 8 (c1) — pass-store char-ID contract preservation:
                # the multi-shot path at dispatch_cli.py:2238 still passes
                # char-IDs as `refs=`. Keep the single-shot surface consistent
                # by passing char-IDs (when elements_payload provided them)
                # in preference to ref paths. Sidecar refs_used (above) gets
                # the resolved ref paths separately via populate_sidecar.
                refs=_element_char_ids_for_provenance or _refs_for_record,
                duration=duration,
                aspect_ratio=args.aspect_ratio,
            )

        status = "OK" if result.success else "FAIL"
        print(f"[{status}] {args.shot} → {result.output_path} (${cost_usd:.2f})")
        if result.error:
            print(f"Error: {result.error}")
        print(f"Done in {elapsed:.0f}s")


if __name__ == "__main__":
    main()
