# recoil/pipeline/_lib/dispatch_payload.py
"""Shared dispatch payload builder.

ONE canonical home for translating a CanonicalShot into the unified payload
dict that `dispatch("video_i2v", payload, context=ctx)` expects. All CLI and
narrative dispatch paths now flow through `build_unified_payload` via a
`PayloadContext`. Wired into run_overnight via
episode_runner.py:_build_workflow_for_beat.

Downstream contract (consumed by video_model_client._dict_to_unified, then
video_runner → step_runner.execute_video → fal adapter).

Output dict contract (consumed by video_model_client._dict_to_unified):
    {
      "shot_id":          str,
      "prompt":           str,
      "model":            str,
      "duration":         int,
      "aspect_ratio":     str,                      # required by _require_aspect_ratio
      "start_frame":      str | None,               # stringified path (json-safe; rehydrated downstream)
      "image_tail":       str | None,               # base64-encoded end_frame
      "reference_images": list[str] | None,
      "reference_videos": list[str] | None,
      "generate_audio":   bool,
      "negative_prompt":  str | None,
      "provider_hints":   dict | None,              # tier, endpoint, etc.
      "elements_payload": dict | None,              # kling-o3 only; None here
      "inputs_snapshot":  dict | None,
    }

For r2v_multi mode the output omits `start_frame`/`image_tail` and instead
populates `reference_images` (max 9). The clustering layer (Phase 4)
calls this builder once per Batch and passes `shot=batch.shots[0]`
+ extra batch metadata via the explicit kwargs.

Start frame resolution (Law 1 — two-level only, no convention scan):
    1. Explicit `start_frame_override: Path | None` kwarg (testing /
       manual dispatch).
    2. Per-shot state sidecar:
       projects/{project}/state/visual/shots/{shot_id}.json
       field `gate_results.hero_frame` (preferred) or `output_path`
       (fallback). Paths are stored relative to projects/{project}/.
    NO filename-convention glob scan. If state says approved but file
    is missing → raise FileNotFoundError immediately.
"""

from __future__ import annotations

import base64
import dataclasses
import itertools
import json
import logging
import os
import re
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional

from recoil.core.atomic_write import jsonl_append_locked
from recoil.core.model_profiles import get_model
from recoil.core.model_profiles import get_profile
from recoil.core.paths import ProjectPaths, RECOIL_ROOT, projects_root
from recoil.core.ref_resolver import (
    resolve_character_bundle,
    resolve_location_refs,
    resolve_sheet_asset,
)
from recoil.core.ref_errors import MissingBoardRefError
from recoil.core.ref_gate import assert_refs_complete
from recoil.core.ref_types import ReferenceBundle
from recoil.pipeline._lib.author_strategies import (
    DETERMINISTIC_TEMPLATE,
    SHOT_SPEC,
    AuthorInputError,
    AuthorStrategy,
    StrategyResolutionError,
    require_strategy_inputs,
    resolve_strategy,
)
from recoil.pipeline._lib.cinema_loader import render_cinema_tokens
from recoil.pipeline._lib.filter_safety import (
    filter_safety_mode,
    lint_prompt,
    summarize_findings,
)
from recoil.pipeline._lib.opus_oauth import OpusOAuthError, call_opus_oauth
from recoil.pipeline._lib.plan_loader import CanonicalShot, CharacterEntry
from recoil.pipeline._lib.prompt_engine import (
    BindAssertionError,
    BoundPrompt,
    _get_seeddance_film_stock,
    _strip_focal_mm_tokens_for_model,
    bind_named_prose,
    get_builder,
)
from recoil.pipeline._lib.prose_validator import Severity, verify_authored_prose
from recoil.pipeline._lib.recoil_bridge import load_project_config
from recoil.pipeline._lib.shot_primitive import primitive_from_payload_context
from recoil.pipeline._lib.world_state_pass import derive_settings

logger = logging.getLogger(__name__)

_project_config_cache: dict[str, dict] = {}

# Process-local breaker state. Dispatch runs through the single-threaded
# StepRunner path, so this does not need locking.
_consecutive_author_call_failures: int = 0


def _reset_breaker_state() -> None:
    global _consecutive_author_call_failures

    _consecutive_author_call_failures = 0

# Pipeline-default model used when no override is supplied AND the plan
# does not specify a video_model. seeddance-2.0 is the default
# per `_recon_dispatch_table.md` and per JT's stated default.
DEFAULT_VIDEO_MODEL = "seeddance-2.0"
DEFAULT_TIER = "standard_720p"
DEFAULT_ASPECT_RATIO = "9:16"

# Modalities the build_dispatch_payload wrapper knows how to lower from a
# CanonicalShot. Narrative callers (episode_runner, audit_dispatch) only ever
# pass video_i2v or r2v_multi. CLI callers do NOT go through this constant —
# they construct PayloadContext directly with their own modality string and
# call build_unified_payload.
NARRATIVE_MODALITIES = ("video_i2v", "r2v_multi")

# Narrative shots default to generate_audio=True so the autonomous
# overnight surface matches the dispatch_cli default (see
# _recon_payload_trace.md Trace E and _recon_run_overnight_retrace.md
# Trace 6). Per-shot overrides via shot.raw["generate_audio"] still
# beat this default.
NARRATIVE_DEFAULT_GENERATE_AUDIO = True
_FILTER_SAFETY_LOG_PATH = (
    RECOIL_ROOT / "_dispatch_logs" / "filter_safety_lint.jsonl"
)


class DispatchPayloadError(Exception):
    """Raised when the builder cannot produce a valid payload."""


class PromptTooLongError(DispatchPayloadError):
    """Raised when an assembled prompt exceeds the selected provider cap."""


class AuthorCallError(RuntimeError):
    """Raised when the prose author model fails or returns malformed output."""


@dataclass
class AuthorPromptResult:
    """Prompt text plus metadata from the author-aware build path."""

    prompt: str
    modality: str
    strategy: str
    payload_refs: dict[str, Any]
    fallback: bool = False


def _utc_now_iso8601() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


def _apply_filter_safety_shadow(ctx: PayloadContext, payload: dict[str, Any]) -> None:
    """Attach shadow filter-safety summary and append the joinable lint ledger."""
    try:
        mode = filter_safety_mode(ctx.project)
        if mode == "off":
            return

        findings = lint_prompt(payload["prompt"])
        if not findings:
            return

        summary = summarize_findings(findings)
        payload["filter_safety"] = summary
        jsonl_append_locked(
            _FILTER_SAFETY_LOG_PATH,
            {
                "ts": _utc_now_iso8601(),
                "project": ctx.project,
                "shot_id": payload.get("shot_id") or ctx.shot_id,
                "model": payload.get("model"),
                "mode": mode,
                "warn_count": summary["warn"],
                "info_count": summary["info"],
                "findings": [asdict(finding) for finding in findings],
            },
        )
    except Exception as exc:  # noqa: BLE001
        payload.pop("filter_safety", None)
        try:
            jsonl_append_locked(
                _FILTER_SAFETY_LOG_PATH,
                {
                    "lint_error": str(exc),
                    "ts": _utc_now_iso8601(),
                    "project": ctx.project,
                    "shot_id": payload.get("shot_id") or ctx.shot_id,
                    "model": payload.get("model"),
                    "mode": "unknown-or-shadow",
                },
            )
        except Exception:  # noqa: BLE001
            logger.exception("dispatch_payload: filter-safety lint logging failed")


# ============================================================================
# PayloadContext — upstream payload assembly context (payload_assembly convergence, 2026-05-25)
# ============================================================================
# See SYNTHESIS at consultations/recoil/payload-assembly-convergence-2026-05-25/
# CLI callers populate pre-resolved fields. Narrative callers populate shot/
# batch_shots and let build_unified_payload resolve what's missing from state.


@dataclass
class PayloadContext:
    """Upstream payload assembly context."""

    # Required
    project: str
    modality: str
    shot_id: str

    # Pre-resolved (CLI sets these; narrative leaves None for resolution)
    prompt: Optional[str] = None
    start_frame_path: Optional[Path] = None
    end_frame_path: Optional[Path] = None
    reference_image_paths: Optional[list[Path]] = None
    reference_video_paths: Optional[list[Path]] = None
    negative_prompt: Optional[str] = None
    board_ref_path: Optional[str] = None
    board_gated: bool = False

    # Generation parameters
    model_id: Optional[str] = None
    duration_s: Optional[float] = None
    aspect_ratio: str = "9:16"
    generate_audio: bool = True

    # Modality-specific
    elements_payload: Optional[dict[str, Any]] = None
    multi_shots: bool = False
    image_urls_payload: Optional[list[str]] = None  # Kling-O3 @Image tokens channel

    # Narrative context (plan path only — CLI never sets these)
    shot: Optional[CanonicalShot] = None
    batch_shots: Optional[list[CanonicalShot]] = None
    prompt_directive: Optional[str] = None

    # Routing / provenance
    tier: Optional[str] = None
    endpoint: Optional[str] = None
    episode: Optional[str] = None
    bible: Optional[dict] = None

    # Explicit author-strategy override (REC-111). None → resolver uses its own
    # precedence (RECOIL_AUTHOR_STRATEGY env / primitive.strategy / ref shape).
    strategy_override: Optional[str] = None


def _resolve_board_artifact(project: str, board_ref_path) -> str | None:
    """Resolve a Beat.board artifact (project-relative by contract) to an
    absolute path. Single resolver for the live AND audit payload paths."""
    if not board_ref_path:
        return None
    board_path = Path(str(board_ref_path)).expanduser()
    if not board_path.is_absolute():
        board_path = ProjectPaths.for_project(project).project_root / board_path
    return str(board_path)


def _resolve_board_ref_path(ctx: PayloadContext) -> str | None:
    return _resolve_board_artifact(ctx.project, ctx.board_ref_path)


def _insert_board_line(prompt: str, *, board_index: int, segment_count: int) -> str:
    board_line = (
        f"The attached storyboard @Image{board_index} defines the framing and "
        f"composition of the {segment_count} shots, panels 1-{segment_count} in "
        "order — match each shot to its panel."
    )
    if "\n" not in prompt:
        return f"{prompt.rstrip()}\n{board_line}"
    first, rest = prompt.split("\n", 1)
    return f"{first}\n{board_line}\n{rest}"


def author_pass(
    primitive,
    strategy: AuthorStrategy,
    *,
    bible: dict,
    project_config: dict,
    ref_manifest: dict,
) -> str:
    """Author raw real-name prose for a primitive using the prose-author role."""

    require_strategy_inputs(primitive, strategy)
    try:
        system_prompt = strategy.system_prompt_path.read_text(encoding="utf-8")
    except Exception as exc:
        raise AuthorCallError(
            f"{primitive.shot_id}: could not read strategy prompt "
            f"{strategy.system_prompt_path}"
        ) from exc

    author_model = get_model("prose_author", "text")
    user_prompt = _render_author_user_prompt(
        primitive,
        strategy,
        bible=bible,
        project_config=project_config,
        ref_manifest=ref_manifest,
    )
    text = _call_author_model(author_model, system_prompt, user_prompt)
    if not isinstance(text, str) or not text.strip():
        raise AuthorCallError(
            f"{primitive.shot_id}: prose author returned empty or malformed output"
        )
    return text.strip()


def _render_author_user_prompt(
    primitive,
    strategy: AuthorStrategy,
    *,
    bible: dict,
    project_config: dict,
    ref_manifest: dict,
) -> str:
    _segs = getattr(primitive, "timing_segments", None)
    if _segs is None and isinstance(primitive, dict):
        _segs = primitive.get("timing_segments")
    _beat_spans = _canonical_beat_spans(primitive)
    _beat_skeleton = [
        {
            "index": i,
            "timecode": tc,
            "duration_s": max(0.0, end_s - start_s),
            "source_shot_id": (
                seg.get("source_shot_id") if isinstance(seg, dict)
                else getattr(seg, "source_shot_id", None)
            ),
            "shot_type": (
                seg.get("shot_type") if isinstance(seg, dict)
                else getattr(seg, "shot_type", None)
            ),
            "setting": (
                seg.get("setting") if isinstance(seg, dict)
                else getattr(seg, "setting", None)
            ),
        }
        for i, ((start_s, end_s, tc), seg) in enumerate(zip(_beat_spans, _segs or []))
    ]
    payload = {
        "strategy": strategy.name,
        "beat_skeleton": _beat_skeleton,
        "primitive": dataclasses.asdict(primitive)
        if dataclasses.is_dataclass(primitive)
        else primitive,
        "ref_manifest": ref_manifest or {},
        "bible": _sanitize_author_context(bible or {}),
        "project_config": _sanitize_author_context(project_config or {}),
        "constraints": {
            "no_image_tokens": "Never write @Image1 or @ImageN.",
            "names": "Use real character names; the binder converts names to refs.",
        },
    }
    return json.dumps(payload, ensure_ascii=True, indent=2, default=str)


def _sanitize_author_context(value: Any) -> Any:
    """Keep generation-model routing identifiers out of the author prompt."""

    redacted_strings = (
        "seeddance-2.0",
        "kling-v3",
        "kling-o3",
        "wan-2.7-i2v",
        "veo-3.1",
    )
    if isinstance(value, dict):
        sanitized: dict[Any, Any] = {}
        for key, child in value.items():
            key_text = str(key).lower()
            if key_text in {"model", "models", "video_model", "default_models"}:
                continue
            sanitized[key] = _sanitize_author_context(child)
        return sanitized
    if isinstance(value, list):
        return [_sanitize_author_context(child) for child in value]
    if isinstance(value, tuple):
        return [_sanitize_author_context(child) for child in value]
    if isinstance(value, str):
        text = value
        for needle in redacted_strings:
            text = text.replace(needle, "[video-model-redacted]")
        return text
    return value


def _call_author_model(model_id: str, system_prompt: str, user_prompt: str) -> str:
    try:
        return call_opus_oauth(model_id, system_prompt, user_prompt).strip()
    except OpusOAuthError as exc:
        raise AuthorCallError(f"prose author call failed: {exc}") from exc
    except AuthorCallError:
        raise
    except Exception as exc:
        raise AuthorCallError(f"prose author call failed: {exc}") from exc


_DP_TIMECODE_RE = re.compile(r"\[\d+:\d{2}\s*-\s*\d+:\d{2}\]")


def _dp_fmt_tc(start_s: float, end_s: float) -> str:
    def _mmss(t: float) -> str:
        m, s = divmod(int(round(t)), 60)
        return f"{m}:{s:02d}"

    return f"[{_mmss(start_s)}-{_mmss(end_s)}]"


def _segment_duration_s(segment: Any) -> float:
    value = (
        segment.get("duration_s")
        if isinstance(segment, dict)
        else getattr(segment, "duration_s", None)
    )
    try:
        return float(value or 0)
    except (TypeError, ValueError):
        return 0.0


def _primitive_target_duration_s(primitive: Any) -> float | None:
    value = getattr(primitive, "target_editorial_duration_s", None)
    if value is None and isinstance(primitive, dict):
        value = primitive.get("target_editorial_duration_s")
    try:
        duration = float(value)
    except (TypeError, ValueError):
        return None
    return duration if duration > 0 else None


def _canonical_beat_spans(primitive: Any) -> list[tuple[float, float, str]]:
    """Canonical spans per timing segment, normalized to target duration.

    SSOT for beat timecodes — used both to foreground the skeleton to the author
    and to normalize the authored output. The LLM never computes timecodes.
    """
    segs = getattr(primitive, "timing_segments", None)
    if segs is None and isinstance(primitive, dict):
        segs = primitive.get("timing_segments")
    durations = [_segment_duration_s(s) for s in segs or []]
    target_duration = _primitive_target_duration_s(primitive)
    duration_sum = sum(durations)
    if target_duration is not None and durations:
        if duration_sum > 0:
            scale = target_duration / duration_sum
            durations = [d * scale for d in durations]
        else:
            durations = [target_duration / len(durations) for _ in durations]

    out: list[tuple[float, float, str]] = []
    t = 0.0
    for i, d in enumerate(durations):
        if target_duration is not None and i == len(durations) - 1:
            end = target_duration
        else:
            end = t + d
        out.append((t, end, _dp_fmt_tc(t, end)))
        t = end
    return out


def _canonical_beat_timecodes(primitive: Any) -> list[str]:
    """Canonical [m:ss-m:ss] per timing segment."""

    return [tc for _start_s, _end_s, tc in _canonical_beat_spans(primitive)]


def _normalize_authored_timecodes(authored: str, primitive: Any) -> str:
    """Overwrite the authored beat timecodes with the canonical skeleton ones,
    positionally, so timecodes are 100% structural regardless of what the LLM
    emitted. If the beat COUNT does not match the skeleton, return unchanged so
    the verifier blocks on count (the one constraint the author must satisfy).
    """
    canon = _canonical_beat_timecodes(primitive)
    if not canon:
        return authored
    matches = list(_DP_TIMECODE_RE.finditer(authored or ""))
    if len(matches) != len(canon):
        return authored
    out = authored
    for m, tc in zip(reversed(matches), reversed(canon)):
        out = out[: m.start()] + tc + out[m.end() :]
    return out


def _has_blocking_verify_result(results: list[Any]) -> bool:
    for result in results or []:
        severity = getattr(result, "severity", None)
        if severity is None and isinstance(result, dict):
            severity = result.get("severity")
        name = getattr(severity, "name", severity)
        value = getattr(severity, "value", None)
        if str(name).upper() == "BLOCK" or str(value).upper() == "BLOCK":
            return True
    return False


def _format_verify_failures(results: list[Any]) -> str:
    parts: list[str] = []
    for result in results or []:
        severity = getattr(result, "severity", None)
        if severity is None and isinstance(result, dict):
            severity = result.get("severity")
        message = getattr(result, "message", None)
        if message is None and isinstance(result, dict):
            message = result.get("message")
        if message:
            parts.append(f"{getattr(severity, 'name', severity)}: {message}")
    return "\n".join(parts)


def _log_verify_results(results: list[Any], *, primitive_id: str, strategy: str) -> None:
    for result in results or []:
        severity = getattr(result, "severity", None)
        message = getattr(result, "message", None)
        check = getattr(result, "check", None)
        severity_name = str(getattr(severity, "name", severity)).upper()
        if severity_name == "WARN":
            logger.warning(
                "prose_verify_warn primitive_id=%s strategy=%s check=%s message=%s",
                primitive_id,
                strategy,
                check,
                message,
            )
        elif severity_name == "BLOCK":
            logger.info(
                "prose_verify_block primitive_id=%s strategy=%s check=%s message=%s",
                primitive_id,
                strategy,
                check,
                message,
            )


def _log_prose_author_fallback(
    *,
    reason: str,
    strategy: str,
    primitive_id: str,
    video_model: str,
    modality: str,
) -> None:
    logger.warning(
        "prose_author_fallback reason=%s strategy=%s primitive_id=%s "
        "video_model=%s modality=%s",
        reason,
        strategy,
        primitive_id,
        video_model,
        modality,
    )


def _project_config_for(project: str) -> dict:
    if project not in _project_config_cache:
        _project_config_cache[project] = load_project_config(project)
    return _project_config_cache[project]


def _prompt_engine_modality(modality: str) -> str:
    if modality == "r2v_multi":
        return "r2v_multi"
    if modality == "video_i2v":
        return "i2v"
    raise DispatchPayloadError(
        f"Cannot resolve prompt from shot for modality {modality!r}; "
        f"caller must pre-populate PayloadContext.prompt for this modality."
    )


def _build_deterministic_template_prompt(
    ctx: PayloadContext,
    *,
    model_id: str,
    ref_manifest: dict,
    segment_timestamps: Optional[list[float]],
    start_frame: Any,
    bible: dict,
    project_config: dict,
) -> str:
    prompt_modality = _prompt_engine_modality(ctx.modality)
    try:
        builder = get_builder(model_id, prompt_modality)
    except KeyError as e:
        raise DispatchPayloadError(
            f"No prompt builder for ({model_id!r}, {prompt_modality!r}): {e}"
        ) from e

    if ctx.modality == "video_i2v" and start_frame is not None and ctx.shot is not None:
        shot_raw_for_builder: dict[str, Any] = {**ctx.shot.raw}
        routing_data = dict(shot_raw_for_builder.get("routing_data") or {})
        routing_data["start_frame_path"] = str(start_frame)
        shot_raw_for_builder["routing_data"] = routing_data
    else:
        shot_raw_for_builder = ctx.shot.raw if ctx.shot is not None else {}

    if ctx.modality == "r2v_multi" and ctx.batch_shots:
        return builder(
            [s.raw for s in ctx.batch_shots],
            bible or {},
            project_config,
            ref_manifest=ref_manifest,
            segment_timestamps=segment_timestamps,
        )
    return builder(shot_raw_for_builder, bible or {}, project_config)


_DIRECTED_PROSE_CINEMA_MODALITIES = frozenset({"r2v_multi"})


def _resolve_provider_cap(
    model_id: str,
    *,
    reference_images: Optional[list[Any]] = None,
    generate_audio: bool = False,
    negative_prompt: Optional[str] = None,
    image: Any = None,
    image_tail: Any = None,
    resolution: str = "720p",
    tier: str | None = None,
    modality: str | None = None,
) -> int | None:
    from recoil.execution.providers.base import (
        ProviderCapabilityError,
        UnifiedVideoPayload,
    )
    from recoil.execution.providers import registry

    if modality is not None and modality not in NARRATIVE_MODALITIES:
        return None

    cap_payload = UnifiedVideoPayload(
        prompt="",
        resolution=resolution or "720p",
        image=image,
        image_tail=image_tail,
        reference_images=list(reference_images or []),
        generate_audio=bool(generate_audio),
        negative_prompt=negative_prompt,
        model_id=model_id,
    )
    try:
        adapter, _resolved_tier = registry.resolve_adapter(
            model_id, cap_payload, tier=tier
        )
    except ProviderCapabilityError:
        # Budget preflight must not become a duplicate capability gate. If a
        # payload asks for an unsupported non-prompt capability, still read the
        # primary provider's adapter-owned cap when available.
        entry = registry.load_strategy().get(model_id, {}) or {}
        provider_id = entry.get("primary")
        if not provider_id:
            return None
        adapter = registry._load_adapter(provider_id)
    return adapter.max_prompt_chars


def _enforce_provider_prompt_cap(
    prompt: str,
    *,
    model_id: str,
    reference_images: Optional[list[Any]],
    generate_audio: bool,
    negative_prompt: Optional[str],
    image: Any,
    image_tail: Any,
    resolution: str,
    tier: str | None,
    shot_id: str,
    modality: str | None = None,
) -> None:
    cap = _resolve_provider_cap(
        model_id,
        reference_images=reference_images,
        generate_audio=generate_audio,
        negative_prompt=negative_prompt,
        image=image,
        image_tail=image_tail,
        resolution=resolution,
        tier=tier,
        modality=modality,
    )
    if cap is not None and len(prompt) > cap:
        raise PromptTooLongError(
            f"{shot_id}: prompt is {len(prompt)} chars, over provider cap {cap}"
        )


def _append_directed_prose_cinema_style(
    prompt: str,
    *,
    ctx: PayloadContext,
    modality: str,
    model_id: str,
    project_config: dict,
) -> str:
    """Attach deterministic cinema look tokens to authored directed prose."""

    if modality not in _DIRECTED_PROSE_CINEMA_MODALITIES or ctx.shot is None:
        return prompt

    cinema_block = ctx.shot.cinematography or ctx.shot.raw.get("cinematography") or {}
    if not isinstance(cinema_block, dict):
        cinema_block = {}

    cinema_tokens = render_cinema_tokens(
        mode_id=cinema_block.get("mode") or project_config.get("cinema_mode"),
        model_id=model_id,
        shot_overrides=cinema_block.get("overrides"),
    )

    style_parts: list[str] = []
    if cinema_tokens:
        style_parts.append(
            _strip_focal_mm_tokens_for_model(
                str(cinema_tokens).lower(),
                model_id,
            )
        )
    else:
        film_stock = _get_seeddance_film_stock(project_config or {})
        if film_stock:
            style_parts.append(f"shot on {str(film_stock).lower()}")

    if not style_parts:
        return prompt

    style_parts.append("Cinematic, photorealistic")
    style = ". ".join(part.strip().rstrip(".") for part in style_parts if part.strip())
    if not style:
        return prompt
    return f"{prompt.rstrip()}\n\nStyle: {style}."


def _append_prompt_directive(prompt: str, prompt_directive: str | None) -> str:
    directive = (prompt_directive or "").strip()
    if not directive:
        return prompt
    if directive in prompt:
        return prompt
    return f"{prompt.rstrip()}\n\n{directive}"


def _inject_world_state_settings(ctx: PayloadContext, primitive):
    """Derive per-segment `setting` lines and return a CLONE carrying them.

    REC-111 world-state pass. Registry-load failure is fail-soft (mirrors the
    LLM call): the original primitive is returned unchanged and `setting` is
    left absent. `derive_settings` is itself fail-soft for the LLM call, so this
    helper never raises to the dispatch path. The original primitive object is
    never mutated — the derived segments flow only through the returned clone.
    """

    sublocations: Optional[dict] = None
    if primitive.location_id:
        try:
            from recoil.pipeline._lib import sublocation_registry

            project_paths = ProjectPaths.for_project(ctx.project)
            registry = sublocation_registry.load_location_registry(
                project_paths, primitive.location_id
            )
        except Exception:  # noqa: BLE001 — fail soft exactly like the LLM call
            logger.warning(
                "world_state_pass_skipped reason=registry_load_failed primitive_id=%s",
                primitive.shot_id,
            )
            return primitive
        if registry:
            sublocations = registry.get("sublocations") or {}

    new_segments = derive_settings(
        primitive.timing_segments,
        location_id=primitive.location_id,
        char_ids=primitive.char_ids,
        sublocations=sublocations,
    )
    logger.info(
        "world_state_pass active primitive_id=%s segments=%d",
        primitive.shot_id,
        len(new_segments),
    )
    return dataclasses.replace(primitive, timing_segments=new_segments)


def _verify_spatial_pre_spend(ctx: PayloadContext, primitive) -> None:
    """Run the REC-111 spatial validator before any payload is built.

    Pre-spend gate for r2v_multi: BLOCK results raise ``DispatchPayloadError``
    listing the failing checks (env ``RECOIL_SPATIAL_OVERRIDE=1`` downgrades
    BLOCK→WARN, logged loudly); WARN results log via the existing prose-verify
    warning logger. An undecomposed location (no registry) produces no results
    and is silent. Caller invokes this only when ``primitive.location_id`` is
    truthy, on the strategy-agnostic r2v_multi build point.
    """
    from recoil.pipeline._lib.spatial_validator import verify_spatial
    from recoil.pipeline._lib.sublocation_registry import (
        load_location_registry,
        location_base_dir,
    )

    project_paths = ProjectPaths.for_project(ctx.project)
    registry = load_location_registry(project_paths, primitive.location_id)
    base_dir = location_base_dir(project_paths, primitive.location_id)
    results = verify_spatial(primitive, registry=registry, base_dir=base_dir)
    if not results:
        return

    warns = [r for r in results if r.severity == Severity.WARN]
    blocks = [r for r in results if r.severity == Severity.BLOCK]
    _log_verify_results(warns, primitive_id=primitive.shot_id, strategy="spatial")

    if not blocks:
        return

    detail = "; ".join(f"{r.check}: {r.message}" for r in blocks)
    if os.environ.get("RECOIL_SPATIAL_OVERRIDE") == "1":
        logger.warning(
            "RECOIL_SPATIAL_OVERRIDE=1 — downgrading %d spatial BLOCK(s) to WARN "
            "for %s: %s",
            len(blocks),
            primitive.shot_id,
            detail,
        )
        return
    raise DispatchPayloadError(
        f"spatial pre-spend validation BLOCKED for {primitive.shot_id}: {detail}"
    )


def _build_author_aware_prompt(
    ctx: PayloadContext,
    *,
    model_id: str,
    ref_manifest: dict,
    ref_images: Optional[list[str]] = None,
    start_frame: Any = None,
    end_frame: Any = None,
    generate_audio: bool = False,
    negative_prompt: Optional[str] = None,
    resolution: str = "720p",
    tier: str | None = None,
    segment_timestamps: Optional[list[float]] = None,
    primitive_segment_timestamps: Optional[list[Any]] = None,
    bible: Optional[dict] = None,
    project_config: Optional[dict] = None,
) -> AuthorPromptResult:
    """Build a prompt through strategy resolution, authoring, verify, and bind."""

    global _consecutive_author_call_failures

    if ctx.shot is None:
        _reset_breaker_state()
        raise DispatchPayloadError(
            "PayloadContext requires either ctx.prompt OR ctx.shot to build a prompt."
        )

    cfg = project_config if project_config is not None else _project_config_for(ctx.project)
    prompt_bible = bible or {}
    try:
        primitive = primitive_from_payload_context(
            ctx,
            ref_manifest=ref_manifest,
            start_frame=start_frame,
            end_frame=end_frame,
            segment_timestamps=primitive_segment_timestamps or segment_timestamps,
        )
        # REC-111 Phase 5: spatial pre-spend validator. Strategy-agnostic — runs
        # for every r2v_multi authored payload (incl. default directed_prose),
        # before resolve_strategy picks shot_spec vs directed_prose. Skipped when
        # the primitive carries no location_id (undecomposed/locationless).
        if ctx.modality == "r2v_multi" and primitive.location_id:
            _verify_spatial_pre_spend(ctx, primitive)
        modality, strategy = resolve_strategy(
            primitive,
            explicit=ctx.strategy_override,
            model_id=model_id,
            requested_modality=ctx.modality,
        )
    except StrategyResolutionError as e:
        _reset_breaker_state()
        raise DispatchPayloadError(str(e)) from e
    except Exception:
        _reset_breaker_state()
        raise

    def deterministic(reason: str | None = None) -> AuthorPromptResult:
        if reason != "author_call":
            _reset_breaker_state()
        if reason:
            _log_prose_author_fallback(
                reason=reason,
                strategy=strategy.name,
                primitive_id=primitive.shot_id,
                video_model=model_id,
                modality=modality,
            )
        prompt = _build_deterministic_template_prompt(
            ctx,
            model_id=model_id,
            ref_manifest=ref_manifest,
            segment_timestamps=segment_timestamps,
            start_frame=start_frame,
            bible=prompt_bible,
            project_config=cfg,
        )
        ctx.prompt = prompt
        return AuthorPromptResult(
            prompt=prompt,
            modality=modality,
            strategy=DETERMINISTIC_TEMPLATE,
            payload_refs={},
            fallback=reason is not None,
        )

    if strategy.name == DETERMINISTIC_TEMPLATE:
        return deterministic()

    # REC-111 world-state setting pass (env-gated, default OFF for this build).
    # The clone carries the derived per-segment settings into author_pass,
    # _normalize_authored_timecodes, and verify_authored_prose; the original
    # primitive object is never mutated.
    if (
        modality == "r2v_multi"
        and strategy.name == SHOT_SPEC
        and os.environ.get("RECOIL_WORLD_STATE_PASS") == "1"
    ):
        primitive = _inject_world_state_settings(ctx, primitive)

    cap = _resolve_provider_cap(
        model_id,
        reference_images=ref_images,
        generate_audio=generate_audio,
        negative_prompt=negative_prompt,
        image=start_frame,
        image_tail=end_frame,
        resolution=resolution,
        tier=tier,
        modality=modality,
    )
    max_attempts = 3 if cap is not None else 2
    retry_failure_text: str | None = None
    bound: BoundPrompt | None = None

    try:
        for attempt in range(1, max_attempts + 1):
            author_project_config = cfg
            if retry_failure_text:
                author_project_config = {
                    **(cfg or {}),
                    "prose_author_retry_failures": retry_failure_text,
                }
            authored = author_pass(
                primitive,
                strategy,
                bible=prompt_bible,
                project_config=author_project_config,
                ref_manifest=ref_manifest,
            )
            authored = _normalize_authored_timecodes(authored, primitive)
            try:
                verify_results = verify_authored_prose(authored, primitive, strategy)
            except Exception:
                return deterministic("verify_error")
            _log_verify_results(
                verify_results,
                primitive_id=primitive.shot_id,
                strategy=strategy.name,
            )
            if _has_blocking_verify_result(verify_results):
                retry_failure_text = _format_verify_failures(verify_results)
                if attempt >= max_attempts:
                    return deterministic("verify_block")
                logger.info(
                    "prose author verify BLOCK for %s/%s; retrying: %s",
                    primitive.shot_id,
                    strategy.name,
                    retry_failure_text,
                )
                continue

            bound = bind_named_prose(
                authored,
                primitive,
                ref_manifest,
                modality=modality,
            )
            prompt_text = _append_directed_prose_cinema_style(
                bound.text,
                ctx=ctx,
                modality=modality,
                model_id=model_id,
                project_config=cfg,
            )
            if cap is None or len(prompt_text) <= cap:
                break
            overage = len(prompt_text) - cap
            if attempt >= max_attempts:
                raise PromptTooLongError(
                    f"{primitive.shot_id}: prompt is {len(prompt_text)} chars, "
                    f"{overage} over provider cap {cap} after {max_attempts} attempts"
                )
            logger.warning(
                "r2v prompt over provider cap (%d): attempt %d/3, %d chars; "
                "re-authoring",
                cap,
                attempt,
                len(prompt_text),
            )
            retry_failure_text = (
                f"prompt was {len(prompt_text)} chars, {overage} over the {cap} cap "
                "— tighten the action prose; keep verbatim dialogue + every "
                "timecode + the style block"
            )
        else:
            raise DispatchPayloadError(
                f"{primitive.shot_id}: prose author did not produce a prompt"
            )
    except AuthorInputError:
        return deterministic("author_input")
    except AuthorCallError:
        _consecutive_author_call_failures += 1
        if _consecutive_author_call_failures >= 3:
            raise DispatchPayloadError(
                "prose author systemic failure: 3 consecutive author_call errors "
                "— halting before further spend (check Claude transport/auth); "
                f"last shot {primitive.shot_id}"
            )
        return deterministic("author_call")
    except BindAssertionError:
        return deterministic("bind_assertion")
    except Exception:
        _reset_breaker_state()
        raise

    _reset_breaker_state()
    ctx.prompt = prompt_text
    return AuthorPromptResult(
        prompt=prompt_text,
        modality=modality,
        strategy=strategy.name,
        payload_refs=dict(bound.payload_refs or {}),
        fallback=False,
    )


def _author_bound_i2v_refs(
    prompt_result: AuthorPromptResult | None,
    *,
    shot_id: str,
) -> dict[str, Any]:
    """Return validated i2v refs from the author/bind path, if it was used."""

    if (
        prompt_result is None
        or prompt_result.modality != "video_i2v"
        or prompt_result.strategy == DETERMINISTIC_TEMPLATE
        or prompt_result.fallback
    ):
        return {}
    refs = dict(prompt_result.payload_refs or {})
    if not refs.get("start_frame"):
        raise DispatchPayloadError(
            f"{shot_id}: i2v bound prompt missing payload_refs.start_frame"
        )
    return refs


def _serialize_image_tail_ref(
    value: Any,
    *,
    shot_id: str,
    require_local_file: bool,
) -> str:
    """Lower a bound image_tail ref to the live payload string."""

    if isinstance(value, bytes):
        return base64.b64encode(value).decode()
    if isinstance(value, bytearray):
        return base64.b64encode(bytes(value)).decode()

    text = str(value)
    if text.startswith(("http://", "https://", "data:")):
        return text

    path = Path(text).expanduser()
    try:
        if path.exists():
            return base64.b64encode(path.read_bytes()).decode()
    except OSError as exc:
        if require_local_file:
            raise FileNotFoundError(
                f"{shot_id}: image_tail {path} is unreadable: {exc}"
            ) from exc

    if require_local_file:
        raise FileNotFoundError(f"{shot_id}: image_tail {path} does not exist")
    return text


def _audit_requires_author_prompt(modality: str, model_id: str) -> bool:
    """REC-72 audit must not hide failures on the live authoring tuples."""

    return (model_id, modality) in {
        ("seeddance-2.0", "r2v_multi"),
        ("kling-v3", "video_i2v"),
    }


def build_unified_payload(ctx: PayloadContext) -> dict[str, Any]:
    """Canonical upstream payload assembler (payload_assembly convergence, 2026-05-25).

    Resolution priority: explicit ctx fields first, then resolve from
    ctx.shot / project state. CLI callers pre-resolve everything;
    narrative callers provide ctx.shot and let this function resolve.

    Behavior matches the legacy build_dispatch_payload for narrative paths
    (Phase 2 parity test enforces this). New CLI paths (Phase 3-5) pass
    pre-resolved fields and bypass the from-shot resolution branches.
    """
    # === 1. Model resolution: explicit override → shot.video_model → DEFAULT ===
    if ctx.model_id:
        model_id = ctx.model_id
    elif ctx.shot and ctx.shot.video_model:
        model_id = ctx.shot.video_model
    else:
        model_id = DEFAULT_VIDEO_MODEL

    try:
        profile = get_profile(model_id)
    except KeyError as e:
        raise DispatchPayloadError(
            f"Unknown model_id {model_id!r} (no profile in model_profiles.json)"
        ) from e

    tier = ctx.tier or DEFAULT_TIER
    resolution = "720p"

    min_dur = profile.get("min_duration_seconds")
    max_dur = profile.get("max_duration_seconds")
    segment_shot_ids: Optional[list[str]] = None
    expected_segment_timestamps: Optional[list[tuple[float, float]]] = None
    segment_timestamps: Optional[list[float]] = None

    # r2v_multi requires batch_shots — explicit contract for direct callers.
    # The build_dispatch_payload wrapper already guards this for narrative
    # paths; CLI / direct PayloadContext callers must also be checked here
    # so a stray "r2v_multi" + no batch silently falls into single-shot math.
    if ctx.modality == "r2v_multi" and not ctx.batch_shots:
        raise DispatchPayloadError("r2v_multi modality requires non-empty batch_shots")

    # === 2. Duration clamping (mirrors build_dispatch_payload behavior) ===
    if ctx.modality == "r2v_multi" and ctx.batch_shots:
        raw_durs = [float(s.duration_s or 5.0) for s in ctx.batch_shots]
        if isinstance(min_dur, (int, float)):
            seg_durs = [max(float(min_dur), d) for d in raw_durs]
        else:
            seg_durs = raw_durs
        total = sum(seg_durs)
        if isinstance(max_dur, (int, float)) and total > max_dur:
            logger.warning(
                "dispatch_payload: r2v_multi total %.1fs > %ds; scaling segments",
                total,
                int(max_dur),
            )
            scale = float(max_dur) / total
            seg_durs = [d * scale for d in seg_durs]
            total = float(max_dur)
        duration = int(total)
        segment_shot_ids = [s.shot_id for s in ctx.batch_shots]
        segment_starts = [
            round(t, 2) for t in itertools.accumulate(seg_durs, initial=0.0)
        ]
        segment_timestamps = segment_starts[:-1]
        expected_segment_timestamps = [
            (segment_starts[i], segment_starts[i + 1]) for i in range(len(seg_durs))
        ]
    else:
        if ctx.duration_s is not None:
            duration_src = float(ctx.duration_s)
        elif ctx.shot and ctx.shot.duration_s is not None:
            duration_src = float(ctx.shot.duration_s)
        else:
            duration_src = 5.0
        if duration_src <= 0:
            raise DispatchPayloadError(
                f"{ctx.shot_id}: duration_s must be > 0, got {duration_src}"
            )
        duration = int(duration_src)
        if isinstance(min_dur, (int, float)) and duration < int(min_dur):
            logger.warning(
                "dispatch_payload: %s duration %ds floored to %ds (model %s min)",
                ctx.shot_id,
                duration,
                int(min_dur),
                model_id,
            )
            duration = int(min_dur)
        if isinstance(max_dur, (int, float)) and duration > int(max_dur):
            logger.warning(
                "dispatch_payload: capping %s duration %ds -> %ds (model %s max)",
                ctx.shot_id,
                duration,
                int(max_dur),
                model_id,
            )
            duration = int(max_dur)

    # Aspect ratio precedence: explicit ctx > shot > default.
    # Mirrors the precedence of other ctx fields (prompt, start_frame_path,
    # model_id). Prior version inverted this — shot.aspect_ratio always won
    # over ctx.aspect_ratio when both were set.
    if ctx.aspect_ratio:
        aspect_ratio = ctx.aspect_ratio
    elif ctx.shot and ctx.shot.aspect_ratio:
        aspect_ratio = ctx.shot.aspect_ratio
    else:
        aspect_ratio = DEFAULT_ASPECT_RATIO

    # === 3. Start frame resolution ===
    # Explicit wins. Otherwise resolve from sidecar (narrative path) for i2v only.
    start_frame: Optional[Path] = None
    image_tail_b64: Optional[str] = None

    if ctx.start_frame_path is not None:
        # Explicit override path. Match legacy _resolve_start_frame behavior:
        # validate existence unconditionally so CLI and narrative callers
        # both fail-fast on a missing file. (Legacy _resolve_start_frame
        # raised FileNotFoundError on override miss regardless of caller.)
        start_frame = Path(ctx.start_frame_path).expanduser()
        if not start_frame.exists():
            raise FileNotFoundError(
                f"{ctx.shot_id}: start_frame_override {start_frame} does not exist"
            )
    elif ctx.shot is not None and ctx.modality == "video_i2v":
        start_frame = _resolve_start_frame(
            shot=ctx.shot,
            project=ctx.project,
            override=None,
        )

    # === 4. References — explicit wins; else collect from shot ===
    if ctx.board_gated and not ctx.board_ref_path:
        raise MissingBoardRefError(ctx.shot_id)

    ref_images: list[str]
    ref_manifest: dict[str, int]
    if ctx.reference_image_paths is not None:
        ref_images = [str(p) for p in ctx.reference_image_paths]
        ref_manifest = {}
    elif ctx.shot is not None:
        ref_collect_batch = ctx.batch_shots if ctx.modality == "r2v_multi" else None
        ref_images, ref_manifest = _collect_reference_images(
            ctx.shot,
            ctx.project,
            ctx.modality,
            batch_shots=ref_collect_batch,
            board_gated=ctx.board_gated,
            board_ref_path=ctx.board_ref_path,
        )
    else:
        ref_images = []
        ref_manifest = {}
    board_ref_path = _resolve_board_ref_path(ctx)
    if board_ref_path:
        # Board rides as one extra ref after the canonical collected-ref cap.
        # Keep it past the provider cap by clipping optional Pass-2 backfill
        # first; hero/location manifest bindings survive, and the board is
        # always the final @Image slot.
        collected_ref_count = min(len(ref_images), 8)
        ref_images = ref_images[:collected_ref_count] + [board_ref_path]
        ref_manifest = {
            k: v for k, v in ref_manifest.items() if v <= collected_ref_count
        }
        ref_manifest["board_1"] = len(ref_images)

    ref_videos: list[str] = []
    if ctx.reference_video_paths is not None:
        ref_videos = [str(p) for p in ctx.reference_video_paths]

    # === 5. Routing fields that must be effective before authoring ===
    # These fields affect provider selection in VideoModelClient._dict_to_unified
    # / resolve_adapter, so the cap projection must use the same resolved values
    # as the final payload.
    audio_flag = ctx.generate_audio
    if ctx.shot is not None:
        raw_audio = ctx.shot.raw.get("generate_audio")
        if raw_audio is not None:
            audio_flag = bool(raw_audio)
    if audio_flag and profile.get("supports_audio", False) is not True:
        logger.warning(
            "dispatch_payload: %s — generate_audio dropped; model %s does not "
            "advertise supports_audio",
            ctx.shot_id,
            model_id,
        )
        audio_flag = False

    negative_prompt: Optional[str] = None
    supports_neg = profile.get("supports_negative_prompt", False)
    if ctx.negative_prompt is not None:
        if supports_neg:
            negative_prompt = ctx.negative_prompt
        else:
            logger.warning(
                "dispatch_payload: %s — explicit negative_prompt dropped; "
                "model %s does not advertise supports_negative_prompt",
                ctx.shot_id,
                model_id,
            )
    elif ctx.shot is not None and supports_neg:
        negative_prompt = ctx.shot.raw.get("negative_prompt") or None

    # === 5. Prompt resolution: explicit wins; else build from shot ===
    prompt_result: AuthorPromptResult | None = None
    if ctx.prompt is not None:
        prompt = ctx.prompt
    elif ctx.shot is not None:
        end_frame_for_author = ctx.end_frame_path
        if end_frame_for_author is None:
            end_frame_for_author = ctx.shot.raw.get("end_frame")
        prompt_result = _build_author_aware_prompt(
            ctx,
            model_id=model_id,
            ref_manifest=ref_manifest,
            ref_images=ref_images,
            start_frame=start_frame,
            end_frame=end_frame_for_author,
            generate_audio=audio_flag,
            negative_prompt=negative_prompt,
            resolution=resolution,
            tier=tier,
            segment_timestamps=segment_timestamps,
            primitive_segment_timestamps=expected_segment_timestamps
            or segment_timestamps,
            bible=ctx.bible or {},
        )
        prompt = prompt_result.prompt
    else:
        raise DispatchPayloadError(
            "PayloadContext requires either ctx.prompt OR ctx.shot to build a prompt."
        )
    prompt = _append_prompt_directive(prompt, ctx.prompt_directive)
    if board_ref_path:
        board_index = ref_manifest["board_1"]
        segment_count = len(ctx.batch_shots) if ctx.batch_shots else 1
        prompt = _insert_board_line(
            prompt,
            board_index=board_index,
            segment_count=segment_count,
        )

    bound_i2v_refs = _author_bound_i2v_refs(prompt_result, shot_id=ctx.shot_id)
    if bound_i2v_refs:
        start_frame = Path(str(bound_i2v_refs["start_frame"])).expanduser()
        ctx.start_frame_path = start_frame

    # === 6. end_frame_path / image_tail base64 (i2v only) ===
    if ctx.modality == "video_i2v":
        if bound_i2v_refs.get("image_tail") is not None:
            image_tail_b64 = _serialize_image_tail_ref(
                bound_i2v_refs["image_tail"],
                shot_id=ctx.shot_id,
                require_local_file=True,
            )
        elif ctx.end_frame_path is not None:
            ep = Path(ctx.end_frame_path).expanduser()
            if not ep.exists():
                raise FileNotFoundError(f"{ctx.shot_id}: end_frame {ep} does not exist")
            image_tail_b64 = base64.b64encode(ep.read_bytes()).decode()
        elif ctx.shot is not None:
            end_frame_path = ctx.shot.raw.get("end_frame")
            if end_frame_path:
                ep = Path(end_frame_path).expanduser()
                if not ep.exists():
                    raise FileNotFoundError(
                        f"{ctx.shot_id}: end_frame {ep} does not exist"
                    )
                image_tail_b64 = base64.b64encode(ep.read_bytes()).decode()

    # === 9. provider_hints ===
    hints: dict[str, Any] = {"tier": tier}
    if ctx.endpoint:
        hints["endpoint"] = ctx.endpoint
    if ctx.modality == "r2v_multi":
        hints["r2v_multi"] = True
        if ctx.batch_shots:
            hints["segment_count"] = len(ctx.batch_shots)
    if ctx.multi_shots:
        hints["multi_shots"] = True

    # === 10. inputs_snapshot ===
    inputs_snapshot = {
        "shot_id": ctx.shot_id,
        "model_id": model_id,
        "tier": tier,
        "modality": ctx.modality,
        "project": ctx.project,
        "episode": ctx.episode,
        "previs_model": ctx.shot.previs_model if ctx.shot else None,
    }

    _enforce_provider_prompt_cap(
        prompt,
        model_id=model_id,
        reference_images=ref_images,
        generate_audio=audio_flag,
        negative_prompt=negative_prompt,
        image=start_frame,
        image_tail=image_tail_b64,
        resolution=resolution,
        tier=tier,
        shot_id=ctx.shot_id,
        modality=ctx.modality,
    )

    # === 11. Assemble payload (KEY ORDER matters for fixture parity) ===
    payload: dict[str, Any] = {
        "shot_id": ctx.shot_id,
        "prompt": prompt,
        "model": model_id,
        "duration": duration,
        "aspect_ratio": aspect_ratio,
        "generate_audio": audio_flag,
        "provider_hints": hints,
        "inputs_snapshot": inputs_snapshot,
        "elements_payload": ctx.elements_payload,
    }
    # start_frame is already validated (via override exists-check or
    # _resolve_start_frame). No defensive re-check — trust the resolver.
    if ctx.modality == "video_i2v" and start_frame is not None:
        payload["start_frame"] = str(start_frame)
    if image_tail_b64 is not None:
        payload["image_tail"] = image_tail_b64
    if ref_images:
        payload["reference_images"] = ref_images
    if board_ref_path:
        payload["ref_manifest"] = ref_manifest
    if ref_videos:
        payload["reference_videos"] = ref_videos
    if ctx.image_urls_payload is not None:
        payload["image_urls_payload"] = ctx.image_urls_payload
    if negative_prompt:
        payload["negative_prompt"] = negative_prompt
    if segment_shot_ids is not None:
        payload["segment_shot_ids"] = segment_shot_ids
    if expected_segment_timestamps is not None:
        payload["expected_segment_timestamps"] = expected_segment_timestamps
    payload["gate_results"] = {}
    payload["prompt_layers"] = {}
    if (
        os.environ.get("RECOIL_IDENTITY_GATE") == "1"
        and ctx.modality in ("video_i2v", "r2v_multi")
    ):
        gate_ref_paths: list[Path] = []
        for ref in ref_images:
            try:
                ref_path = Path(ref).expanduser()
                if ref_path.exists():
                    gate_ref_paths.append(ref_path)
            except OSError:
                continue
        if gate_ref_paths:
            # Store a SERIALIZABLE spec (ref-path strings), NOT the gate callable:
            # the payload is part of the persisted Workflow (save_scene -> json.dumps)
            # and must round-trip. The runner builds the gate from this spec at
            # dispatch time.
            payload["identity_gate_ref_paths"] = [str(p) for p in gate_ref_paths]
        else:
            logger.info(
                "dispatch_payload: RECOIL_IDENTITY_GATE=1 but no file-backed refs "
                "resolved for %s; skipping identity gate",
                ctx.shot_id,
            )

    _apply_filter_safety_shadow(ctx, payload)
    return payload


def build_dispatch_payload(
    *,
    shot=None,
    project: str,
    modality: str = "video_i2v",
    model_override: Optional[str] = None,
    tier_override: Optional[str] = None,
    generate_audio: Optional[bool] = None,
    bible: Optional[dict] = None,
    episode: Optional[str] = None,
    batch_shots=None,
    start_frame_override: Optional[Path] = None,
    dry_run: bool = False,
    skip_author: bool = False,
    prompt_directive: Optional[str] = None,
    grouping: Optional[dict[str, Any]] = None,
    generation_config: Optional[dict[str, Any]] = None,
    element_config: Optional[dict[str, Any]] = None,
    strategy_override: Optional[str] = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Translate a CanonicalShot into the unified dispatch payload dict.

    Args:
        shot:             The single CanonicalShot (or head of a batch).
        project:          Project slug, e.g. "tartarus".
        modality:         "video_i2v" (per-shot) or "r2v_multi" (batched).
        model_override:   Forces a specific model_id (e.g. "kling-v3").
                          Otherwise: shot.video_model, then DEFAULT_VIDEO_MODEL.
                          NEVER promotes shot.previs_model.
        tier_override:    Forces a specific tier (e.g. "fast_720p").
        generate_audio:   Tri-state: True/False forces; None uses
                          NARRATIVE_DEFAULT_GENERATE_AUDIO. Per-shot
                          shot.raw["generate_audio"] always wins.
        bible:            Optional bible dict for prompt builder.
        episode:          Episode id (carried in inputs_snapshot).
        batch_shots:      r2v_multi only — full list of CanonicalShots.
        start_frame_override: Explicit Path; bypasses sidecar lookup.
        skip_author:      Dry-run only; resolve payload refs without prose authoring.
        grouping:         Explicit grouping identity for honest filenames/provenance.
        generation_config: Coverage generation config to preserve in payload metadata.
        element_config:   Coverage element config to preserve in payload metadata.

    Returns:
        Dict ready to pass into dispatch("video_i2v", payload, ctx).

    Raises:
        FileNotFoundError: state says approved but file is missing.
        DispatchPayloadError: required fields missing or malformed.

    REIMPLEMENTED 2026-05-25 as a thin wrapper that builds a PayloadContext
    and delegates to build_unified_payload. Signature is UNCHANGED — all
    pre-existing call sites continue to work without modification.
    """
    # Dry-run gate for the audit scaffolding (Phase 10). Behavior preserved
    # exactly — _build_audit_payload remains the canonical audit shape.
    if dry_run:
        audit_kwargs = dict(kwargs)
        audit_kwargs["board_gated"] = bool(kwargs.get("board_gated", False))
        if grouping is not None:
            audit_kwargs["grouping"] = grouping
        if generation_config is not None:
            audit_kwargs["generation_config"] = generation_config
        if element_config is not None:
            audit_kwargs["element_config"] = element_config
        return _build_audit_payload(
            shot=shot,
            project=project,
            modality=modality,
            model_override=model_override,
            tier_override=tier_override,
            generate_audio=generate_audio,
            bible=bible,
            episode=episode,
            batch_shots=batch_shots,
            skip_author=skip_author,
            kwargs=audit_kwargs,
        )

    if modality not in NARRATIVE_MODALITIES:
        raise DispatchPayloadError(
            f"Unsupported modality {modality!r}; expected one of {NARRATIVE_MODALITIES}"
        )
    if not isinstance(shot, CanonicalShot):
        raise DispatchPayloadError(
            f"shot must be CanonicalShot, got {type(shot).__name__}"
        )
    if not shot.shot_id:
        raise DispatchPayloadError("CanonicalShot has empty shot_id")
    if modality == "r2v_multi" and not batch_shots:
        raise DispatchPayloadError(
            "r2v_multi modality requires batch_shots — pass list of CanonicalShot"
        )

    # Build a PayloadContext from the CanonicalShot and delegate. The narrative
    # path leaves all CLI-only fields (prompt, start_frame_path, refs) at None
    # so build_unified_payload resolves them from project state. This preserves
    # the legacy behavior exactly (Phase 2 parity test enforces byte-identical
    # output for representative fixtures).
    ctx = PayloadContext(
        project=project,
        modality=modality,
        shot_id=shot.shot_id,
        # Pre-resolved fields: all None — narrative path resolves from shot.
        prompt=None,
        start_frame_path=start_frame_override,  # only override; else resolve from sidecar
        end_frame_path=None,  # resolved from shot.raw["end_frame"] inside builder
        reference_image_paths=None,  # resolved from shot.characters
        reference_video_paths=None,
        negative_prompt=None,  # resolved from shot.raw["negative_prompt"] if supported
        board_ref_path=kwargs.get("board_ref_path"),
        board_gated=bool(kwargs.get("board_gated", False)),
        # Generation parameters
        model_id=model_override,  # None → falls back to shot.video_model → DEFAULT
        duration_s=(float(shot.duration_s) if shot.duration_s is not None else None),
        aspect_ratio=(shot.aspect_ratio if shot.aspect_ratio else DEFAULT_ASPECT_RATIO),
        generate_audio=(
            generate_audio
            if generate_audio is not None
            else NARRATIVE_DEFAULT_GENERATE_AUDIO
        ),
        # Modality-specific (narrative path never sets these)
        elements_payload=None,
        multi_shots=False,
        # Narrative context
        shot=shot,
        batch_shots=batch_shots,
        prompt_directive=prompt_directive,
        # Routing / provenance
        tier=tier_override,  # None → falls back to DEFAULT_TIER
        endpoint=None,
        episode=episode,
        bible=bible,
        strategy_override=strategy_override,
    )
    payload = build_unified_payload(ctx)
    if isinstance(grouping, dict):
        payload["grouping"] = dict(grouping)
    if isinstance(generation_config, dict):
        payload["generation_config"] = dict(generation_config)
        for key in (
            "tier", "seed", "resolution", "format_mode",
            "anchor_duration_s", "regen_previz_for_segment",
        ):
            if key in generation_config:
                val = generation_config[key]
                if val is not None:
                    payload[key] = val
                else:
                    payload.pop(key, None)
        for hint_key in ("seed", "tier"):
            if generation_config.get(hint_key) is not None:
                payload.setdefault("provider_hints", {})[hint_key] = (
                    generation_config[hint_key]
                )
    if isinstance(element_config, dict):
        existing = payload.setdefault("element_config", {})
        if isinstance(existing, dict):
            existing.update(element_config)
        else:
            payload["element_config"] = dict(element_config)
    return payload


def _resolve_start_frame(
    *,
    shot: CanonicalShot,
    project: str,
    override: Optional[Path],
) -> Path:
    """Locate the approved previs PNG for shot.shot_id.

    Resolution order (Law 1 — two SSOTs, NO convention scan):
      1. override — explicit Path (testing / manual dispatch).
      2. State sidecar: projects/{project}/state/visual/shots/{shot_id}.json,
         field gate_results.hero_frame (preferred) or output_path (fallback).
         Both store project-relative paths.

    Returns:
        Path — guaranteed to exist (raises FileNotFoundError otherwise).

    Raises:
        FileNotFoundError: override missing OR sidecar absent OR sidecar
            field missing OR file at resolved path missing.
    """
    if override is not None:
        p = Path(override).expanduser()
        if not p.exists():
            raise FileNotFoundError(
                f"{shot.shot_id}: start_frame_override {p} does not exist"
            )
        return p

    # Use ProjectPaths.visual_state_dir instead of constructing "state/visual"
    # manually — keeps STATE_NAMESPACE SSOT honored when the namespace is
    # rolled back via pipeline_config.json (Debug Round 1 Issue #3).
    sidecar = (
        ProjectPaths.for_project(project).visual_state_dir
        / "shots"
        / f"{shot.shot_id}.json"
    )
    if not sidecar.exists():
        raise FileNotFoundError(
            f"{shot.shot_id}: no state sidecar at {sidecar}. "
            f"Plan needs previs generation OR start_frame_override must be set."
        )
    try:
        data = json.loads(sidecar.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError) as e:
        raise FileNotFoundError(
            f"{shot.shot_id}: sidecar {sidecar} unreadable: {e}"
        ) from e

    rel: Optional[str] = None
    gate = data.get("gate_results") or {}
    if isinstance(gate, dict) and gate.get("hero_frame"):
        rel = str(gate["hero_frame"])
    elif data.get("output_path"):
        rel = str(data["output_path"])
    if not rel:
        raise FileNotFoundError(
            f"{shot.shot_id}: sidecar {sidecar} has no "
            f"gate_results.hero_frame or output_path; "
            f"approved start_frame is unresolved."
        )

    resolved = (projects_root() / project / rel).resolve()
    if not resolved.exists():
        raise FileNotFoundError(
            f"{shot.shot_id}: state says approved start_frame={rel!r} "
            f"but file is missing at {resolved}"
        )
    return resolved


# ──────────────────────────────────────────────────────────────────────────
# Composite sheet path — ADDITIVE, env-var-gated (2026-05-25)
# ──────────────────────────────────────────────────────────────────────────
# When RECOIL_USE_COMPOSITE_SHEETS=1 AND all required sheets exist on disk at
# the canonical paths, _collect_reference_images returns sheet refs (one per
# entity) instead of the two-pass angle collection. If any required sheet is
# missing, falls back silently to the existing behavior. When the env var is
# unset (the default), this code is a no-op — zero behavior change for any
# current caller. Generate sheets via tools/generate_composite_sheet.py.
#
# Per consultations/recoil/gpt-image-2-seedance-2-best-practices-2026-05-25/
# SYNTHESIS.md: Seedance 2 treats a single composite sheet as one input with
# spatial attention over the grid — bypasses the "Identity Blend" failure mode
# triggered by multiple separate same-subject refs.

_SHEETS_ENV_VAR = "RECOIL_USE_COMPOSITE_SHEETS"
_SHEETS_PROJECT_CONFIG_KEY = "use_composite_sheets"

# Sheet existence + validity (the 100KB / PNG-magic / >=512px floor that rejects the
# REC-31/REC-33 clobber class) now lives in recoil.core.ref_resolver.resolve_sheet_asset
# (the canonical sheet resolver). Corrupt sheets raise ref_errors.SheetIntegrityError.


def composite_sheets_enabled(project: str) -> bool:
    """Whether the composite-sheet path is active for this project.

    Two activation channels (either enables):
      1. Env var ``RECOIL_USE_COMPOSITE_SHEETS=1`` — ad-hoc / per-fire override
         (e.g. testing the substrate, debugging a specific call). Wins over
         project config when set.
      2. ``project_config.json::use_composite_sheets: true`` — persistent
         per-project opt-in. JT doesn't have to remember the env var for
         every production fire once the project has been validated.

    Default off when neither channel activates — preserves the zero-behavior
    -change guarantee for any project that hasn't opted in.
    """
    if os.environ.get(_SHEETS_ENV_VAR) == "1":
        return True
    # Route through the module-level _project_config_cache (the pattern used
    # by the prompt-builder branches at lines 319 + 1059) so per-shot calls
    # in a batched dispatch don't each hit disk, and so cache coherence
    # between this gate and downstream consumers is preserved.
    cfg = _project_config_cache.get(project)
    if cfg is None:
        try:
            cfg = load_project_config(project) or {}
        except Exception:  # noqa: BLE001 — config load is opportunistic, never blocks
            cfg = {}
        else:
            _project_config_cache[project] = cfg
    return bool(cfg.get(_SHEETS_PROJECT_CONFIG_KEY))


# Backward-compat alias: the gate was private (`_composite_sheets_enabled`) before
# REC-213 C3 promoted it to a PUBLIC shared activation surface consumed by board_builder.
_composite_sheets_enabled = composite_sheets_enabled


def _collect_sheet_refs(
    shot: CanonicalShot,
    project: str,
    batch_shots: list[CanonicalShot] | None,
) -> tuple[list[str], dict[str, int]] | None:
    """Composite-sheet ref collection via the canonical bundle lineage.

    Returns (refs, manifest) when composite sheets are enabled (env var OR
    project_config) AND every required entity has a valid sheet; returns None to
    fall back to the angle collector (ALL-OR-NOTHING — a single missing sheet
    parks the whole shot on angles). A corrupt sheet raises SheetIntegrityError
    (loud abort before spend). Sheet existence + validity + layout all live in
    recoil.core.ref_resolver.resolve_sheet_asset / ProjectPaths.sheet_path.

    Manifest mirrors the angle path: identity_N for the Nth character (1-indexed),
    scene_1 for the location.
    """
    if not composite_sheets_enabled(project):
        return None

    project_paths = ProjectPaths.for_project(project)
    sources: list[CanonicalShot] = batch_shots if batch_shots else [shot]

    # Build the deduped character roster (same logic as the main collector).
    seen: set[str] = set()
    char_ids: list[str] = []
    for src in sources:
        for entry in src.characters:
            if isinstance(entry, CharacterEntry):
                cid = entry.char_id
            elif isinstance(entry, dict):
                cid = str(entry.get("char_id", "")).strip().upper()
            else:
                cid = str(entry).strip().upper()
            if cid and cid not in seen:
                seen.add(cid)
                char_ids.append(cid)

    # Required sheets: one per character, plus the shot's location if set.
    required: list[tuple[str, str]] = [("char", cid) for cid in char_ids]
    if shot.location_id:
        required.append(("loc", str(shot.location_id)))

    if not required:
        # No entities to back via sheets — let the main collector handle the
        # (likely env-only) shot via its existing path.
        return None

    # STRICTLY PER-ENTITY, then concatenate. canonical_ref_images() returns exactly
    # (sheet,) per bundle, so a multi-sheet bundle would drop all but the first.
    refs: list[str] = []
    for cls, entity_id in required:
        asset = resolve_sheet_asset(project_paths, cls, entity_id)  # corrupt -> raises
        if asset is None:
            # Missing sheet → fall back to angle-based collection for the WHOLE shot.
            # Loud warning (naming the canonical path that was absent) so a forgotten
            # sheet is visible and a silent fall into the broken base/pool read
            # (REC-76) is impossible.
            logger.warning(
                "dispatch_payload: composite sheet MISSING for %s/%s at %s — "
                "falling back to angle-based collection",
                cls,
                entity_id,
                project_paths.sheet_path(cls, entity_id),
            )
            return None
        sheet = ReferenceBundle((asset,)).canonical_ref_images(prefer_sheet=True)[0]
        refs.append(str(sheet.path))

    # Build manifest mirroring the angle-path convention.
    manifest: dict[str, int] = {}
    for i, _cid in enumerate(char_ids, start=1):
        manifest[f"identity_{i}"] = i  # 1-based, matches refs order
    if shot.location_id:
        manifest["scene_1"] = len(char_ids) + 1

    logger.info(
        "dispatch_payload: USING COMPOSITE SHEETS for %s (%d sheets)",
        shot.shot_id,
        len(refs),
    )
    return refs, manifest


def _collect_reference_images(
    shot: CanonicalShot,
    project: str,
    modality: str,
    batch_shots: list[CanonicalShot] | None = None,
    board_gated: bool = False,
    board_ref_path: str | None = None,
) -> tuple[list[str], dict[str, int]]:
    """Collect canonical character + location refs for a shot or batch.

    Delegates to recoil.core.ref_resolver — the SSOT. Returns a tuple of
    (absolute path strings capped at 9, ref_manifest position map).
    `ref_manifest` is {logical_key: 1-based ImageN token index}, e.g.
    `{"identity_1": 1, "identity_2": 6, "scene_1": 11}`. Prompt builders
    consult it to resolve "@Image{identity_1}" placeholders to "@Image1".

    R5 A1 fix (2026-05-21): two-pass insertion so the 9-ref cap doesn't
    strand a character with one ref against five. Pass 1 inserts each
    character's hero only (N refs for N characters), then the location
    refs, then Pass 2 back-fills front + profile + threequarter + back
    for each character in declaration order. The manifest binds
    identity_N to the hero position from Pass 1—invariant preserved.

    Bug B fix (2026-05-19): the prior body looked in
    `projects/<project>/refs/characters/<slug>/picks/frontal.png`. The
    canonical (v2) layout is `projects/<project>/assets/identity/<slug>/`
    and the canonical filenames are `{hero, front, profile, threequarter,
    back}` per §10g. resolve_character_bundle handles the cascade.

    Defense-in-depth (SYNTHESIS §3): even for `video_i2v` we ship the
    full character + location ref set in addition to the start frame.
    The start frame anchors frame 1; the ref set anchors identity across
    the temporal window. The prior `if modality == "video_i2v": return []`
    short-circuit is removed.

    Character union: when `batch_shots` is provided (r2v_multi case), the
    union of characters across ALL shots in the batch is used—deduped
    by char_id in declaration order.

    Wardrobe phase: forwarded from `CharacterEntry.wardrobe_phase_id` to
    resolve_character_bundle(phase=...). Falls back to None (base hero +
    angles) when the entry doesn't specify a phase.

    Returns:
        tuple[list[str], dict[str, int]]
            - list of absolute paths, length ≤ 9.
            - manifest of logical_key -> 1-based position in the list.
    """
    project_paths = ProjectPaths.for_project(project)
    refs: list[str] = []
    ref_manifest: dict[str, int] = {}

    sources: list[CanonicalShot] = batch_shots if batch_shots else [shot]

    # Build the deduped character roster across the batch, preserving
    # declaration order. Stash (cid, phase, all_paths_ordered) for both
    # passes below.
    seen_chars: set[str] = set()
    required_subjects: list[str] = []
    bundle_assets = []
    char_specs: list[tuple[str, Optional[str], str, list[str], list[str]]] = []
    for src in sources:
        for entry in src.characters:
            # Phase-1 invariant: entry is a CharacterEntry. Tolerate legacy
            # str/dict shapes defensively in case an upstream caller bypassed
            # plan_loader (test fixtures, dispatch_cli ad-hoc payloads).
            if isinstance(entry, CharacterEntry):
                cid = entry.char_id
                phase = entry.wardrobe_phase_id
            elif isinstance(entry, dict):
                cid = str(entry.get("char_id", "")).strip().upper()
                phase = entry.get("wardrobe_phase_id")
            else:
                cid = str(entry).strip().upper()
                phase = None
            if not cid or cid in seen_chars:
                continue
            seen_chars.add(cid)
            required_subjects.append(cid)
            bundle_c = resolve_character_bundle(project_paths, cid, phase=phase)
            bundle_assets.extend(bundle_c.assets)
            hero_assets = [
                a for a in bundle_c.by_role("identity")
                if a.is_hero
            ]
            shelf_view_paths = [
                str(a.path) for a in bundle_c.assets
                if a.kind in ("turn", "fullbody")
            ]
            angle_paths = [
                str(a.path) for a in bundle_c.assets
                if not a.is_hero
            ]
            # REC-31 grounding marker: the old fail-open branch was
            # `if not hero_assets: continue`; identity_N now comes only from
            # the bundle hero asset, and missing heroes are handled by the
            # fail-closed gate below.
            if hero_assets:
                char_specs.append(
                    (cid, phase, str(hero_assets[0].path), angle_paths, shelf_view_paths)
                )

    bundle = ReferenceBundle(tuple(bundle_assets))

    def _assert_ref_gate() -> None:
        raw = shot.raw if isinstance(getattr(shot, "raw", None), dict) else {}
        assert_refs_complete(
            shot_id=shot.shot_id,
            required_subjects=required_subjects,
            bundle=bundle,
            board_gated=board_gated,
            board_ref_path=board_ref_path,
            refless_declared=raw.get("refless", False),
        )

    # Additive composite-sheet path (env-var-gated, 2026-05-25). Returns early
    # if RECOIL_USE_COMPOSITE_SHEETS=1 AND all required sheets exist on disk.
    # Falls back to the angle-based collection below otherwise.
    sheet_result = _collect_sheet_refs(shot, project, batch_shots)
    if sheet_result is not None:
        _assert_ref_gate()
        return sheet_result

    # ── Pass 1: hero only, one per character ──
    # This guarantees @Image{identity_N} resolves for every character even
    # when the cap is tight. char_ordinal counts from 1.
    for char_ordinal, (_cid, _phase, hero, _angle_paths, _shelf) in enumerate(
        char_specs, start=1
    ):
        ref_manifest[f"identity_{char_ordinal}"] = len(refs) + 1
        refs.append(hero)

    # ── Location refs (single block, after all heroes) ──
    if shot.location_id:
        loc_refs = resolve_location_refs(project_paths, str(shot.location_id))
        loc_paths = list(loc_refs.values())
        if loc_paths:
            ref_manifest["scene_1"] = len(refs) + 1
            for p in loc_paths:
                refs.append(str(p))
        else:
            # Unhydrated location: location_id is set but no ref images were found.
            # scene_1 will be absent from ref_manifest; prompt_engine will omit the
            # location declaration and "in @ImageN" clause rather than alias @Image1.
            logger.warning(
                "location_id=%s set on shot %s but no ref images found — "
                "location will be omitted from prompt (unhydrated).",
                shot.location_id,
                getattr(shot, "shot_id", "?"),
            )

    # ── Pass 2: back-fill angles (front/profile/threequarter/back) per char ──
    # Round-robin by angle index so if the cap clips, we lose the LAST
    # angle of the LAST character—not all angles of one character.
    max_angles_per_char = max(
        (len(s or a) for _c, _p, _h, a, s in char_specs),
        default=0,
    )
    for angle_idx in range(1, max_angles_per_char + 1):
        for _cid, _phase, _hero, angle_paths, shelf_view_paths in char_specs:
            eff = shelf_view_paths or angle_paths
            if angle_idx <= len(eff):
                refs.append(eff[angle_idx - 1])

    # Cap refs at 9; prune any manifest entries that landed beyond the cap
    # so consumers never reference a position that doesn't exist.
    refs = refs[:9]
    ref_manifest = {k: v for k, v in ref_manifest.items() if v <= len(refs)}
    _assert_ref_gate()
    return refs, ref_manifest


def _coerce_canonical_shot(s) -> CanonicalShot:
    """Coerce a dict (audit fixture) or CanonicalShot into a CanonicalShot.

    Synthetic audit fixtures pass dict shots; production callers pass
    CanonicalShot already. This is the only entry point that crosses the
    boundary, so the coercion lives here rather than in plan_loader.
    """
    if isinstance(s, CanonicalShot):
        return s
    if not isinstance(s, dict):
        raise DispatchPayloadError(
            f"_coerce_canonical_shot: shot must be dict or CanonicalShot, "
            f"got {type(s).__name__}"
        )
    asset = s.get("asset_data") or {}
    routing = s.get("routing_data") or {}
    chars_raw = asset.get("characters") or s.get("characters") or []
    char_entries: list[CharacterEntry] = []
    for c in chars_raw:
        if isinstance(c, CharacterEntry):
            char_entries.append(c)
        elif isinstance(c, dict):
            cid = str(c.get("char_id") or c.get("name") or "").strip().upper()
            if cid:
                char_entries.append(
                    CharacterEntry(
                        char_id=cid,
                        wardrobe_phase_id=c.get("wardrobe_phase_id"),
                    )
                )
        elif isinstance(c, str):
            cid = c.strip().upper()
            if cid:
                char_entries.append(CharacterEntry(char_id=cid))
    return CanonicalShot(
        shot_id=str(s.get("shot_id") or ""),
        scene_index=int(s.get("scene_index") or 0),
        sequence_id=s.get("sequence_id"),
        pipeline=s.get("pipeline"),
        previs_model=s.get("previs_model"),
        video_model=s.get("video_model") or s.get("model"),
        location_id=asset.get("location_id") or s.get("location_id"),
        characters=char_entries,
        shot_type=s.get("shot_type") or routing.get("shot_type"),
        duration_s=s.get("duration_s"),
        is_env_only=bool(s.get("is_env_only", False)),
        has_dialogue=bool(s.get("has_dialogue", False)),
        aspect_ratio=s.get("aspect_ratio") or routing.get("aspect_ratio"),
        raw=dict(s),
        cinematography=s.get("cinematography"),
        quality=s.get("quality"),
    )


def _build_audit_payload(
    *,
    shot,
    project: str,
    modality: str,
    model_override: Optional[str],
    tier_override: Optional[str],
    generate_audio: Optional[bool],
    bible: Optional[dict],
    episode: Optional[str],
    batch_shots,
    skip_author: bool,
    kwargs: dict,
) -> dict:
    """Build a real-shaped payload for the audit scaffolding (Phase 10).

    Does NOT do disk I/O (no start_frame existence check, no state sidecar
    read, no end_frame base64). Honors modifier kwargs:
        force_has_dialogue: bool
        aspect_ratio_override: str
        force_no_start_frame: bool
        force_no_refs: bool
    """
    force_has_dialogue = bool(kwargs.get("force_has_dialogue"))
    aspect_ratio_override = kwargs.get("aspect_ratio_override")
    force_no_start_frame = bool(kwargs.get("force_no_start_frame"))
    force_no_refs = bool(kwargs.get("force_no_refs"))
    grouping = kwargs.get("grouping")
    generation_config = kwargs.get("generation_config")
    element_config = kwargs.get("element_config")

    cs = _coerce_canonical_shot(shot)
    batch_cs: Optional[list[CanonicalShot]] = None
    if batch_shots:
        batch_cs = [_coerce_canonical_shot(s) for s in batch_shots]

    if force_has_dialogue:
        cs = _with(cs, has_dialogue=True)
        if batch_cs:
            batch_cs = [_with(b, has_dialogue=True) for b in batch_cs]

    shot_id = cs.shot_id or "AUDIT_UNKNOWN"

    model_id = model_override or cs.video_model or DEFAULT_VIDEO_MODEL
    try:
        profile = get_profile(model_id)
    except KeyError:
        profile = {}

    tier = tier_override or DEFAULT_TIER
    resolution = "720p"
    if isinstance(generation_config, dict) and generation_config.get("resolution"):
        resolution = str(generation_config["resolution"])
    min_dur = profile.get("min_duration_seconds")
    max_dur = profile.get("max_duration_seconds")

    segment_shot_ids: Optional[list[str]] = None
    expected_segment_timestamps: Optional[list[tuple[float, float]]] = None
    segment_timestamps: Optional[list[float]] = None

    if modality == "r2v_multi":
        sources = batch_cs or [cs]
        raw_durs = [float(s.duration_s or 5.0) for s in sources]
        if isinstance(min_dur, (int, float)):
            seg_durs = [max(float(min_dur), d) for d in raw_durs]
        else:
            seg_durs = raw_durs
        total = sum(seg_durs)
        if isinstance(max_dur, (int, float)) and total > max_dur:
            scale = float(max_dur) / total
            seg_durs = [d * scale for d in seg_durs]
            total = float(max_dur)
        duration = int(total)
        segment_shot_ids = [s.shot_id for s in sources]
        segment_starts = [
            round(t, 2) for t in itertools.accumulate(seg_durs, initial=0.0)
        ]
        segment_timestamps = segment_starts[:-1]
        expected_segment_timestamps = [
            (segment_starts[i], segment_starts[i + 1]) for i in range(len(seg_durs))
        ]
    else:
        d = cs.duration_s if cs.duration_s is not None else 5.0
        duration = int(d)
        if isinstance(min_dur, (int, float)) and duration < int(min_dur):
            duration = int(min_dur)
        if isinstance(max_dur, (int, float)) and duration > int(max_dur):
            duration = int(max_dur)

    aspect_ratio = aspect_ratio_override or cs.aspect_ratio or DEFAULT_ASPECT_RATIO

    # Refs: skip if force_no_refs; otherwise resolve via the real path.
    if bool(kwargs.get("board_gated")) and not kwargs.get("board_ref_path"):
        raise MissingBoardRefError(shot_id)

    if force_no_refs:
        ref_images: list[str] = []
        ref_manifest: dict[str, int] = {}
    else:
        ref_collect_batch = batch_cs if modality == "r2v_multi" else None
        ref_images, ref_manifest = _collect_reference_images(
            cs,
            project,
            modality,
            batch_shots=ref_collect_batch,
            board_gated=bool(kwargs.get("board_gated")),
            board_ref_path=kwargs.get("board_ref_path"),
        )

    # Board ref parity with the live path: the audit/dry-run payload feeds
    # revalidate_succeeded_fingerprints, so an approved board MUST change the
    # audit payload's reference_images exactly as it changes the live one —
    # that fingerprint delta is what demotes a succeeded take for re-dispatch.
    audit_board_ref = _resolve_board_artifact(project, kwargs.get("board_ref_path"))
    if audit_board_ref:
        collected_ref_count = min(len(ref_images), 8)
        ref_images = ref_images[:collected_ref_count] + [audit_board_ref]
        ref_manifest = {
            k: v for k, v in ref_manifest.items() if v <= collected_ref_count
        }
        ref_manifest["board_1"] = len(ref_images)

    # Start frame: skip I/O; use shot.raw["start_frame"] as-is, or None.
    start_frame: Optional[str] = None
    if modality == "video_i2v" and not force_no_start_frame:
        sf_raw = cs.raw.get("start_frame")
        if sf_raw:
            start_frame = str(sf_raw)

    # generate_audio / negative_prompt resolution mirrors the real path and
    # must happen before authoring because both fields participate in provider
    # capability selection.
    raw_audio = cs.raw.get("generate_audio")
    if raw_audio is not None:
        audio_flag = bool(raw_audio)
    elif generate_audio is None:
        audio_flag = NARRATIVE_DEFAULT_GENERATE_AUDIO
    else:
        audio_flag = bool(generate_audio)
    if audio_flag and profile.get("supports_audio", True) is not True:
        logger.debug(
            "dispatch_payload audit: %s — preserving narrative generate_audio "
            "default for model %s despite profile supports_audio=%r",
            shot_id,
            model_id,
            profile.get("supports_audio"),
        )
    # R4 revert—see §27 / 2026-05-16. No dialogue-off override.

    negative_prompt: Optional[str] = None
    if profile.get("supports_negative_prompt", False):
        negative_prompt = cs.raw.get("negative_prompt") or None

    prompt = ""
    prompt_result: AuthorPromptResult | None = None
    if modality in NARRATIVE_MODALITIES and not skip_author:
        if project not in _project_config_cache:
            _project_config_cache[project] = load_project_config(project)
        try:
            audit_batch = batch_cs if batch_cs else ([cs] if modality == "r2v_multi" else None)
            primitive_ctx = PayloadContext(
                project=project,
                modality=modality,
                shot_id=shot_id,
                shot=cs,
                batch_shots=audit_batch,
                duration_s=float(duration),
                bible=bible,
            )
            prompt_result = _build_author_aware_prompt(
                primitive_ctx,
                model_id=model_id,
                ref_manifest=ref_manifest,
                ref_images=ref_images,
                start_frame=start_frame,
                end_frame=cs.raw.get("end_frame"),
                generate_audio=audio_flag,
                negative_prompt=negative_prompt,
                resolution=resolution,
                tier=tier,
                segment_timestamps=segment_timestamps,
                primitive_segment_timestamps=expected_segment_timestamps
                or segment_timestamps,
                bible=bible or {},
                project_config=_project_config_cache.get(project) or {},
            )
            prompt = prompt_result.prompt
        except Exception as exc:
            if _audit_requires_author_prompt(modality, model_id):
                raise
            logger.debug("audit dry_run prompt build skipped: %s", exc)
            prompt = ""

    bound_i2v_refs = _author_bound_i2v_refs(prompt_result, shot_id=shot_id)
    image_tail: Optional[str] = None
    if bound_i2v_refs:
        start_frame = str(bound_i2v_refs["start_frame"])
        if bound_i2v_refs.get("image_tail") is not None:
            image_tail = _serialize_image_tail_ref(
                bound_i2v_refs["image_tail"],
                shot_id=shot_id,
                require_local_file=False,
            )

    hints: dict[str, Any] = {"tier": tier}
    if modality == "r2v_multi":
        hints["r2v_multi"] = True
        sources = batch_cs or [cs]
        hints["segment_count"] = len(sources)

    inputs_snapshot = {
        "shot_id": shot_id,
        "model_id": model_id,
        "tier": tier,
        "modality": modality,
        "project": project,
        "episode": episode,
        "previs_model": cs.previs_model,
    }

    # Canonical filename — best-effort; the audit's #7 only checks when present.
    # For r2v_multi, only emit when the caller provides the real grouping
    # identity. Guessing would recreate the old false PASS_001 audit name.
    output_filename: Optional[str] = None
    try:
        from recoil.core.naming import build_filename

        ep_match = re.search(r"(\d+)", str(episode or "0"))
        ep_token = int(ep_match.group(1)) if ep_match else 0
        shot_ids_for_name = segment_shot_ids if segment_shot_ids else [shot_id]
        grouping_strategy = None
        grouping_ordinal = None
        if isinstance(grouping, dict):
            grouping_strategy = grouping.get("strategy")
            grouping_ordinal = grouping.get("ordinal")
        elif grouping is not None:
            grouping_strategy = getattr(grouping, "strategy", None)
            grouping_ordinal = getattr(grouping, "ordinal", None)

        if grouping_strategy and grouping_ordinal is not None:
            output_filename = build_filename(
                episode=ep_token,
                strategy=str(grouping_strategy),
                ordinal=int(grouping_ordinal),
                shot_ids=shot_ids_for_name,
                take=1,
            )
        elif modality != "r2v_multi":
            output_filename = build_filename(
                episode=ep_token,
                strategy="solo",
                ordinal=0,
                shot_ids=shot_ids_for_name,
                take=1,
            )
    except Exception:
        output_filename = None

    payload: dict[str, Any] = {
        "shot_id": shot_id,
        "prompt": prompt,
        "model": model_id,
        "modality": modality,
        "duration": duration,
        "aspect_ratio": aspect_ratio,
        "generate_audio": audio_flag,
        "provider_hints": hints,
        "inputs_snapshot": inputs_snapshot,
        "elements_payload": None,
        "reference_images": ref_images,
        "ref_manifest": ref_manifest,
        "start_frame": start_frame,
        "gate_results": {},
        "prompt_layers": {},
        "dry_run": True,
    }
    if image_tail is not None:
        payload["image_tail"] = image_tail
    if output_filename:
        payload["output_filename"] = output_filename
    if negative_prompt:
        payload["negative_prompt"] = negative_prompt
    if segment_shot_ids is not None:
        payload["segment_shot_ids"] = segment_shot_ids
    if expected_segment_timestamps is not None:
        payload["expected_segment_timestamps"] = expected_segment_timestamps
        # Bug Q: prompt-side timestamps mirror expected_segment_timestamps.
        payload["prompt_segment_timestamps"] = expected_segment_timestamps
    if isinstance(grouping, dict):
        payload["grouping"] = dict(grouping)
    if isinstance(generation_config, dict):
        payload["generation_config"] = dict(generation_config)
        for key in (
            "tier", "seed", "resolution", "format_mode",
            "anchor_duration_s", "regen_previz_for_segment",
        ):
            if key in generation_config:
                val = generation_config[key]
                if val is not None:
                    payload[key] = val
                else:
                    payload.pop(key, None)
        for hint_key in ("seed", "tier"):
            if generation_config.get(hint_key) is not None:
                payload.setdefault("provider_hints", {})[hint_key] = (
                    generation_config[hint_key]
                )
    if isinstance(element_config, dict):
        payload["element_config"] = dict(element_config)

    _enforce_provider_prompt_cap(
        payload.get("prompt") or "",
        model_id=model_id,
        reference_images=ref_images,
        generate_audio=audio_flag,
        negative_prompt=negative_prompt,
        image=start_frame,
        image_tail=image_tail,
        resolution=str(payload.get("resolution") or resolution),
        tier=tier,
        shot_id=shot_id,
        modality=modality,
    )

    # ========================================================================
    # Convergence audit assertions (added 2026-05-25 by payload_assembly convergence)
    # SYNTHESIS: consultations/recoil/payload-assembly-convergence-2026-05-25/
    # ========================================================================

    # Assertion 1: R2V Multi — prompt safety (placeholder leakage)
    if modality == "r2v_multi":
        assert "@Image{" not in (payload.get("prompt") or ""), (
            f"Raw @Image{{}} placeholder leaked into r2v_multi prompt for {payload.get('shot_id')}"
        )

    # Assertion 2: R2V Multi — elements_payload shape (when non-None)
    # NOTE: legacy code sets elements_payload=None unconditionally; only check shape when truthy.
    if modality == "r2v_multi" and payload.get("elements_payload") is not None:
        assert isinstance(payload["elements_payload"], dict), (
            f"r2v_multi payload elements_payload must be dict; got {type(payload['elements_payload']).__name__}"
        )

    # NOTE: prior Wan R2V audit assertions (multi_shots + non-empty refs)
    # were removed — audit_dispatch._prompt_modality_to_dispatch_modality
    # never emits "video_wan_r2v", so the assertions were unreachable. Add
    # back when the audit enumerator includes the wan modality.

    # Assertion 5: Model resolution — never None
    assert payload.get("model") is not None, (
        f"Payload for {modality} has None model after resolution (shot_id={payload.get('shot_id')})"
    )

    # Assertion 6: Reference path type safety — all strings (JSON-safe)
    for ref in payload.get("reference_images", []):
        assert isinstance(ref, str), (
            f"reference_images contains non-string: {type(ref).__name__} in {payload.get('shot_id')}"
        )
    for ref in payload.get("reference_videos", []):
        assert isinstance(ref, str), (
            f"reference_videos contains non-string: {type(ref).__name__} in {payload.get('shot_id')}"
        )

    return payload


def _with(cs: CanonicalShot, **overrides) -> CanonicalShot:
    """Return a shallow copy of `cs` with selected fields overridden."""
    from dataclasses import replace

    new_raw = dict(cs.raw)
    for k, v in overrides.items():
        new_raw[k] = v
    return replace(cs, raw=new_raw, **overrides)


__all__ = [
    "build_dispatch_payload",
    "composite_sheets_enabled",
    "DispatchPayloadError",
    "DEFAULT_VIDEO_MODEL",
    "DEFAULT_TIER",
    "DEFAULT_ASPECT_RATIO",
    "NARRATIVE_MODALITIES",
    "NARRATIVE_DEFAULT_GENERATE_AUDIO",
]
