"""Audit assertions — Phase 10 full implementations.

Wired by `audit_dispatch.py`. Each assertion takes
(payload, modality, model, shot, **_) and either returns None (pass)
or raises AssertionError (fail). Soft returns (assertions #12, #13)
are intentional under synthetic-fixture conditions and carry a
comment explaining why.
"""

from __future__ import annotations

import json
import re

_UNHYDRATED_REF_TOKEN = re.compile(r"@Image\{")


def assert_no_exceptions(payload, modality, model, shot, **_) -> None:  # 1
    if payload is None:
        raise AssertionError(f"[{modality}|{model}] payload is None (build failed)")


def assert_reference_images_complete(payload, modality, model, shot, **_) -> None:  # 2
    if modality not in {"video_i2v", "r2v_multi"}:
        return
    refs = payload.get("reference_images") or []
    if not refs:
        raise AssertionError(f"[{modality}|{model}] reference_images empty")


def assert_no_unhydrated_ref_tokens(payload, modality, model, shot, **_) -> None:  # 3
    prompt = payload.get("prompt") or ""
    if _UNHYDRATED_REF_TOKEN.search(prompt):
        raise AssertionError(
            f"[{modality}|{model}] unhydrated @Image{{...}} token: {prompt[:200]!r}"
        )


# Cinematic / equipment / brand caps that legitimately appear in bound prompts.
# These are NOT character/place proper nouns and must NOT trip the scan (the fal
# content filter cares about person/place names, not film-stock or camera brands).
# Grounded in the live PROMPT_BIBLE cinematic vocabulary + builder output.
_SAFE_PROPER_CAPS = frozenset({
    "Kodak", "Vision3", "Ektachrome", "Portra", "Fujifilm", "Fuji", "Cinestill",
    "Arri", "Alexa", "Cooke", "Zeiss", "Panavision", "Steadicam", "Technicolor",
    "Kelvin", "Rembrandt",  # lighting terms
})

# Shot-size / framing descriptor words that begin a `_SHOT_TYPE_NAMES` label
# (e.g. "Medium shot", "Wide shot", "Extreme close-up") — builders emit these
# capitalized after a comma, so they read as interior caps but are cinematic
# vocabulary, not proper nouns.
_SAFE_SHOT_WORDS = frozenset({
    "Medium", "Wide", "Close", "Long", "Full", "Extreme", "Big", "Very", "Insert",
    "Establishing", "Master", "Cinematic", "Camera",
})

# Tokens that read as capitalized but are not proper nouns: shot-type / format
# abbreviations, quality markers, and resolution tokens.
_SAFE_CAPS_TOKENS = frozenset({
    "WS", "MS", "CU", "ECU", "MCU", "BCU", "MFS", "MLS", "MWS", "LS", "FS", "EWS",
    "VLS", "WIDE", "INSERT", "OTS", "POV", "VFX", "HDR", "UI", "4K", "8K",
    "HD", "RED", "T", "D",  # film-stock tungsten/daylight suffixes (e.g. 500T, 250D)
})

# A capitalized word token: leading uppercase + at least one more letter. The
# single-letter "I" pronoun and bare initials are handled separately (skipped).
_CAP_WORD = re.compile(r"[A-Za-z][A-Za-z]*\d*")
# Sentence-boundary punctuation after which the next capitalized word is expected.
_SENTENCE_END = frozenset(".!?:;\n")


def assert_no_proper_nouns(payload, modality, model, shot, **_) -> None:  # 4
    """Hard-fail on ANY surviving capitalized proper noun in a bound video
    prompt (REC-72 D0d §29) — not just KNOWN character names.

    Two subsets, both fail-closed:
      1. Known character names from shot.asset_data.characters (original
         contract, preserved) — raises even when sentence-initial.
      2. Name-set-INDEPENDENT scan: any INTERIOR (non-sentence-initial)
         capitalized word that is not an allowlisted cinematic/equipment
         brand, shot-size descriptor, ref token (@ImageN), or format/
         resolution abbreviation.

    KNOWN LIMITATION: subset 2 cannot flag a proper noun that is itself
    sentence-initial (e.g. a prompt literally starting "Alice crosses ...")
    without a dictionary, because legitimate prose starts sentences with
    capitalized common words ("The", "Camera", "Cinematic"). Flagging every
    sentence-initial cap would false-positive on real builder output. Subset 1
    still catches sentence-initial KNOWN names; subset 2 catches interior
    UNKNOWN names — which is the realistic leak shape past fal's content
    filter (a stray name surviving mid-sentence).
    """
    if modality not in {"video_i2v", "r2v_multi"}:
        return
    prompt = payload.get("prompt") or ""
    if not prompt:
        return

    # ── Subset 1: known character names (preserved behavior) ───────────────
    # Raise on any KNOWN character name still present — the original contract.
    chars = (shot or {}).get("asset_data", {}).get("characters") or []
    for char in chars:
        if isinstance(char, str):
            name = char
        elif isinstance(char, dict):
            name = char.get("name") or char.get("char_id")
        else:
            name = getattr(char, "name", None)
        if name and re.search(rf"\b{re.escape(name)}\b", prompt):
            raise AssertionError(
                f"[{modality}|{model}] proper noun {name!r} present in prompt"
            )

    # ── Subset 2: name-set-INDEPENDENT scan (§29) ──────────────────────────
    # Catch ANY surviving capitalized proper noun, even one we don't know the
    # name of (the prior check only knew character names, so a stray location
    # or third-party name leaked past fal's content filter). We flag a
    # capitalized word that is NOT sentence-initial and NOT allowlisted.
    #
    # Skipped (legitimate caps): sentence-initial words, the pronoun "I",
    # @ImageN ref tokens, cinematic/equipment brand caps, and shot/format/
    # resolution abbreviations.
    sentence_initial = True  # first word of the prompt counts as sentence-initial
    i = 0
    n = len(prompt)
    while i < n:
        ch = prompt[i]
        # @ImageN ref token — skip the whole token, not a proper noun.
        if ch == "@":
            j = i + 1
            while j < n and (prompt[j].isalnum()):
                j += 1
            i = j
            sentence_initial = False
            continue
        if ch.isalpha():
            # Read the full word token.
            m = _CAP_WORD.match(prompt, i)
            word = m.group(0) if m else ch
            end = i + len(word)
            is_cap = word[0].isupper()
            interior_cap = is_cap and not sentence_initial
            if interior_cap:
                stripped = word.rstrip("s")  # tolerate trailing plural 's'
                if (
                    word != "I"
                    and word not in _SAFE_CAPS_TOKENS
                    and word.upper() not in _SAFE_CAPS_TOKENS
                    and word not in _SAFE_PROPER_CAPS
                    and stripped not in _SAFE_PROPER_CAPS
                    and word not in _SAFE_SHOT_WORDS
                ):
                    raise AssertionError(
                        f"[{modality}|{model}] unknown proper noun {word!r} present "
                        f"in prompt (name-set-independent scan §29): {prompt[max(0, i - 30):i + 40]!r}"
                    )
            # After any alphabetic word, we are no longer sentence-initial.
            sentence_initial = False
            i = end
            continue
        # Non-alpha character: track sentence boundaries.
        if ch in _SENTENCE_END:
            sentence_initial = True
        # Digits, spaces, hyphens, commas, quotes do not reset sentence-initial
        # to True, but they also don't make the next word sentence-initial.
        i += 1


def assert_start_frame_serializable(payload, modality, model, shot, **_) -> None:  # 5
    if modality != "video_i2v":
        return
    sf = payload.get("start_frame")
    if sf is None or isinstance(sf, str):
        return
    raise AssertionError(
        f"[{modality}|{model}] start_frame must be str or None, got {type(sf).__name__}"
    )


def assert_segment_timestamps_match(payload, modality, model, shot, **_) -> None:  # 6
    if modality != "r2v_multi":
        return
    expected = payload.get("expected_segment_timestamps")
    prompt_ts = payload.get("prompt_segment_timestamps")
    if expected is None or prompt_ts is None:
        return
    if list(expected) != list(prompt_ts):
        raise AssertionError(
            f"[{modality}|{model}] timestamps mismatch: expected={expected} prompt={prompt_ts}"
        )


def assert_canonical_filename(payload, modality, model, shot, **_) -> None:  # 7
    from recoil.core.naming import FILENAME_PATTERN

    fn = payload.get("output_filename") or payload.get("filename")
    if fn is None:
        return
    if not FILENAME_PATTERN.match(fn):
        raise AssertionError(f"[{modality}|{model}] non-canonical filename: {fn!r}")


def assert_model_not_unknown(payload, modality, model, shot, **_) -> None:  # 8
    pm = payload.get("model") or ""
    fn = payload.get("output_filename") or payload.get("filename") or ""
    if pm == "unknown" or "_unknown_" in fn:
        raise AssertionError(f"[{modality}|{model}] model=='unknown'")


def assert_sidecar_populated(payload, modality, model, shot, **_) -> None:  # 9
    # Sidecar provenance is verified separately in test_sidecar_provenance.py.
    # Within the synthetic audit, we only confirm refs were provided when the
    # modality requires them — sidecar materialization happens post-dispatch.
    if modality not in {"video_i2v", "r2v_multi"}:
        return
    refs = payload.get("reference_images") or []
    if not refs:
        return  # already covered by assertion #2


def assert_audio_on_default_for_narrative(
    payload, modality, model, shot, **_
) -> None:  # 10
    """R4 revert (§27): for narrative video modalities, generate_audio defaults
    to True when no explicit override is given—REGARDLESS of has_dialogue.
    CP-8 lipsync stacks on top, doesn't replace."""
    if modality not in {"video_i2v", "r2v_multi"}:
        return
    # An explicit override (per-shot raw["generate_audio"] OR caller-passed
    # generate_audio kwarg) bypasses the default; the audit fixture sets
    # `explicit_generate_audio_override: true` for those rows.
    if payload.get("explicit_generate_audio_override") is True:
        return
    if payload.get("generate_audio") is not True:
        raise AssertionError(
            f"[{modality}|{model}] generate_audio default must be True per §27 "
            f"(got {payload.get('generate_audio')!r}; R3 dialogue-off branch should be deleted)"
        )


def assert_payload_json_serializable(payload, modality, model, shot, **_) -> None:  # 11
    try:
        json.dumps(payload, default=str)
    except (TypeError, ValueError) as e:
        raise AssertionError(
            f"[{modality}|{model}] payload not JSON-serializable: {e}"
        ) from e


def assert_refs_exist_and_min_resolution(
    payload, modality, model, shot, **_
) -> None:  # 12
    # SOFT RETURN: synthetic 1x1 PNGs in audit_fixtures/ are intentionally
    # below the production 1024px floor. In real run_overnight this assertion
    # would enforce; the audit shape-fixture suite explicitly excludes it.
    return


def assert_budget_preflight_ok(payload, modality, model, shot, **_) -> None:  # 13
    if not isinstance(payload, dict):
        return

    from recoil.pipeline._lib.dispatch_payload import _resolve_provider_cap

    hints = payload.get("provider_hints") or {}
    tier = hints.get("tier") if isinstance(hints, dict) else None
    cap = _resolve_provider_cap(
        model,
        reference_images=list(payload.get("reference_images") or []),
        generate_audio=bool(payload.get("generate_audio", False)),
        negative_prompt=payload.get("negative_prompt"),
        image=payload.get("image") or payload.get("start_frame"),
        image_tail=payload.get("image_tail"),
        resolution=payload.get("resolution") or "720p",
        tier=tier,
    )
    prompt = payload.get("prompt") or ""
    if cap is not None and len(prompt) > cap:
        raise AssertionError(
            f"[{modality}|{model}] prompt is {len(prompt)} chars, over provider cap {cap}"
        )


_PROMPT_BIBLE_CACHE = None


def _load_prompt_bible() -> dict:
    """Lazy-load PROMPT_BIBLE.yaml. Returns {} on any error.

    Reads the YAML directly — does not import recoil.config.prompt_bible
    (which doesn't expose a get_prompt_rules helper).
    """
    global _PROMPT_BIBLE_CACHE
    if _PROMPT_BIBLE_CACHE is not None:
        return _PROMPT_BIBLE_CACHE
    try:
        import yaml
        from pathlib import Path

        path = Path(__file__).resolve().parents[2] / "config" / "PROMPT_BIBLE.yaml"
        _PROMPT_BIBLE_CACHE = yaml.safe_load(path.read_text()) or {}
    except Exception:
        _PROMPT_BIBLE_CACHE = {}
    return _PROMPT_BIBLE_CACHE


def assert_focal_length_opt_in(payload, modality, model, shot, **_) -> None:  # 14
    if modality not in {"video_i2v", "r2v_multi"}:
        return
    prompt = payload.get("prompt") or ""
    bible = _load_prompt_bible()
    model_block = bible.get(model) if isinstance(bible, dict) else None
    model_rules = model_block if isinstance(model_block, dict) else {}
    global_rules = bible.get("global_defaults", {}) if isinstance(bible, dict) else {}
    if not isinstance(global_rules, dict):
        global_rules = {}
    include_focal = model_rules.get(
        "include_focal_length",
        global_rules.get("include_focal_length", False),
    )
    if include_focal:
        return
    # Match a LENS focal-length cue (e.g. "50mm", "50mm lens") but NOT a film
    # GAUGE / stock / grain descriptor (e.g. "35mm film grain", "16mm print").
    # Cinema modes legitimately inject film-gauge aesthetics ("heavy 35mm pushed
    # film grain") that share the "<N>mm" token with focal lengths but are
    # unrelated to the lens focal length that include_focal_length gates. The
    # negative lookahead drops the mm token only when the next word is a
    # film/stock term, so a real "50mm lens" leak still trips the assertion.
    _FILM_GAUGE_CTX = r"(?:film|grain|print|stock|negative|gauge|emulsion|pushed|push|reversal)"
    if re.search(rf"\b\d+mm\b(?!\s+{_FILM_GAUGE_CTX}\b)", prompt, re.IGNORECASE):
        raise AssertionError(
            f"[{modality}|{model}] focal-length cue in prompt despite include_focal_length=false"
        )


def assert_audio_t2a_payload_valid(payload, modality, model, shot, **_) -> None:  # 15
    if not (payload.get("text") or "").strip():
        raise AssertionError(f"[{modality}|{model}] audio_t2a payload text empty")
    if payload.get("voice_id") is None:
        raise AssertionError(f"[{modality}|{model}] audio_t2a missing voice_id")


def assert_lipsync_post_payload_valid(
    payload, modality, model, shot, **_
) -> None:  # 16
    if not payload.get("video_path"):
        raise AssertionError(f"[{modality}|{model}] lipsync_post missing video_path")
    if not payload.get("audio_path"):
        raise AssertionError(f"[{modality}|{model}] lipsync_post missing audio_path")


# ---------------------------------------------------------------------------
# R4 additions — CLI-surface assertions (#17–#22)
# ---------------------------------------------------------------------------


def assert_single_shot_tag_derivation(
    payload, modality, model, shot, **_
) -> None:  # 17
    """Single-shot dispatch tag must be derived from shot.characters,
    not the SOLO_ENV default. A3 leak fix."""
    if modality != "video_i2v":
        return
    if payload.get("r2v_multi"):
        return  # this is multi-shot; covered by #19 instead
    tag = payload.get("tag") or ""
    if not tag:
        return  # no tag stamped on this payload (audit shim didn't compute it)
    chars = ((shot or {}).get("asset_data", {}) or {}).get("characters") or []
    n_chars = len([c for c in chars if c])
    if n_chars == 0:
        expected_prefix = "SOLO_ENV"
    elif n_chars == 1:
        expected_prefix = "SOLO_"
    elif n_chars == 2:
        expected_prefix = "DUO_"
    else:
        expected_prefix = "MULTI_CHAR"
    if not tag.startswith(expected_prefix.rstrip("_")) and tag != expected_prefix:
        raise AssertionError(
            f"[{modality}|{model}] single-shot tag {tag!r} doesn't match shape "
            f"for {n_chars} character(s); expected to start with {expected_prefix!r}"
        )


def assert_single_shot_sidecar_populated(
    payload, modality, model, shot, **_
) -> None:  # 18
    """Single-shot dispatch must produce a sidecar with non-empty refs_used +
    gate_results + populated prompt_engine_version. A4 leak fix."""
    if modality != "video_i2v":
        return
    if payload.get("r2v_multi"):
        return
    sidecar = payload.get("_audit_sidecar_dict")  # populated by audit_dispatch
    if sidecar is None:
        # Audit shim couldn't read sidecar; skip—production wires it in.
        return
    prov = sidecar.get("provenance") or {}
    pe_ver = prov.get("prompt_engine_version") or ""
    if not (len(pe_ver) == 12 and all(c in "0123456789abcdef" for c in pe_ver)):
        raise AssertionError(
            f"[{modality}|{model}] sidecar.provenance.prompt_engine_version "
            f"must be 12-char SHA256 prefix, got {pe_ver!r}"
        )
    # refs_used must be a non-empty list when reference_images were supplied
    if payload.get("reference_images") and not prov.get("refs_used"):
        raise AssertionError(
            f"[{modality}|{model}] reference_images present but sidecar refs_used empty"
        )


def assert_shots_routes_to_r2v_multi(payload, modality, model, shot, **_) -> None:  # 19
    """When the audit fixture marks a row as `cli_surface: shots_flag`,
    the resulting payload must have r2v_multi=True (NOT per-shot dispatch)."""
    cli_surface = payload.get("_audit_cli_surface") or ""
    if cli_surface != "shots_flag":
        return
    if not payload.get("r2v_multi"):
        raise AssertionError(
            f"[{modality}|{model}] --shots fixture row did NOT route to r2v_multi "
            f"(payload.r2v_multi={payload.get('r2v_multi')!r}); A5 regression"
        )


def assert_every_segment_writes_sidecar(
    payload, modality, model, shot, **_
) -> None:  # 20
    """For r2v_multi dispatches, EVERY segment must produce a .mp4.json sidecar.
    A4 + A5 leak fix (SH16 from BATCH_001 had no sidecar)."""
    if modality != "r2v_multi" and not payload.get("r2v_multi"):
        return
    segment_outputs = payload.get("_audit_segment_outputs") or []
    if not segment_outputs:
        return  # synthetic fixture may not exercise per-segment outputs
    for so in segment_outputs:
        if not so.get("sidecar_present"):
            raise AssertionError(
                f"[{modality}|{model}] segment {so.get('shot_id')} missing sidecar"
            )


def assert_generate_cli_dispatches(payload, modality, model, shot, **_) -> None:  # 21
    """generate.py --pass must dispatch—no NotImplementedError. B1 fix."""
    cli_surface = payload.get("_audit_cli_surface") or ""
    if cli_surface != "generate_cli":
        return
    err = payload.get("_audit_cli_error") or ""
    if "NotImplementedError" in err or "rewire to dispatch" in err:
        raise AssertionError(
            f"[{modality}|{model}] generate.py --pass still raises NotImplementedError; "
            f"B1 regression. stderr excerpt: {err[:300]}"
        )


def assert_run_overnight_dry_run_completes(
    payload, modality, model, shot, **_
) -> None:  # 22
    """run_overnight --dry-run against real ep_001_passes.json completes with rc=0.
    Removes the Phase 11 R3 xfail marker."""
    cli_surface = payload.get("_audit_cli_surface") or ""
    if cli_surface != "run_overnight_dry":
        return
    rc = payload.get("_audit_cli_returncode")
    if rc is None:
        return  # fixture didn't run the smoke; skip
    if rc != 0:
        err = (payload.get("_audit_cli_error") or "")[-1000:]
        raise AssertionError(
            f"[{modality}|{model}] run_overnight --dry-run rc={rc}; stderr tail: {err}"
        )


# ---------------------------------------------------------------------------
# R5 additions—production-leak assertions (#23–#26)
# ---------------------------------------------------------------------------


def assert_multi_char_refs_hydrated(payload, modality, model, shot, **_) -> None:  # 23
    """A1 leak guard: for multi-character r2v_multi / video_i2v shots, the
    payload's reference_images must contain at least one ref per character,
    the ref_manifest must contain identity_1..identity_N (one per character
    in the batch union), every manifest position must be ≤ len(reference_images),
    and the prompt body must contain the corresponding @Image{N} token for
    every N. R5 SYNTHESIS §1.2.

    Phase 2 extensions:
    - scene_1 presence check: when ANY shot in the batch has a non-empty
      location_id, the ref_manifest (if present) must contain scene_1.
      Applies to ALL char counts (not just multi-char).
    - Collision guard: no two distinct logical key types (identity_* vs
      scene_*) may resolve to the same @ImageN index. A collision means
      a character ref and a location ref share a slot, which causes the
      model to conflate them.
    """
    if modality not in {"video_i2v", "r2v_multi"}:
        return
    # Character roster—for r2v_multi, the audit fixture may carry a list
    # of shots in kwargs["shots"]; the union across batch_shots is what the
    # production code resolves. For video_i2v, just the one shot.
    roster: list = []
    if modality == "r2v_multi":
        # Re-derive from the synthetic fixture's batch if exposed.
        shots = _.get("kwargs", {}).get("shots") if isinstance(_, dict) else None
        if not shots:
            shots = [shot] if shot else []
    else:
        shots = [shot] if shot else []
    seen_ids: set[str] = set()
    for s in shots:
        for c in ((s or {}).get("asset_data", {}) or {}).get("characters") or []:
            if isinstance(c, dict):
                cid = (c.get("char_id") or c.get("name") or "").strip().upper()
            else:
                cid = str(c).strip().upper()
            if cid and cid not in seen_ids:
                seen_ids.add(cid)
                roster.append(cid)
    n_chars = len(roster)

    refs = payload.get("reference_images") or []
    manifest = payload.get("ref_manifest") or {}

    # ── Phase 2: scene_1 presence check (ALL char counts) ──────────────
    # When any shot in the batch declares a location_id, and the manifest
    # is non-empty, scene_1 must appear in the manifest so the location ref
    # is properly hydrated. Missing scene_1 means the location token is
    # silently omitted from the prompt (A3 regression guard).
    if manifest:
        has_location = any(
            bool((s or {}).get("asset_data", {}).get("location_id", ""))
            for s in (shots or [])
        )
        if has_location and "scene_1" not in manifest:
            raise AssertionError(
                f"[{modality}|{model}] A3 leak: shot has location_id but "
                f"ref_manifest is missing scene_1"
            )

    # ── Phase 2: collision guard (identity_* vs scene_* index overlap) ──
    # No two distinct logical keys (one identity_* and one scene_*) may
    # resolve to the same @ImageN slot. If they share an index the model
    # receives a single image that is simultaneously treated as a character
    # ref and a location ref — guaranteed confusion.
    if manifest:
        # Cross-type collision only (identity vs scene). Only the index values
        # matter, so collect them directly as sets keyed by type prefix.
        id_vals = {
            v for k, v in manifest.items()
            if isinstance(v, int) and k.startswith("identity_")
        }
        sc_vals = {
            v for k, v in manifest.items()
            if isinstance(v, int) and k.startswith("scene_")
        }
        collisions = id_vals & sc_vals
        if collisions:
            raise AssertionError(
                f"[{modality}|{model}] A1/A3 collision: identity_* and scene_* "
                f"keys share @Image slot(s) {sorted(collisions)} in ref_manifest "
                f"{manifest!r} — each slot must map to exactly one logical key type"
            )

    # ── Original multi-char checks (n_chars >= 2) ───────────────────────
    if n_chars < 2:
        return  # single-character or env shots: scene_1 + collision checks above are enough
    if len(refs) < n_chars:
        raise AssertionError(
            f"[{modality}|{model}] A1 leak: {n_chars} characters but only "
            f"{len(refs)} reference_images"
        )
    # ref_manifest is stamped into the audit payload by _build_audit_payload;
    # in production the same dict flows. Some audit modifier paths drop it.
    if manifest:
        for n in range(1, n_chars + 1):
            key = f"identity_{n}"
            if key not in manifest:
                raise AssertionError(
                    f"[{modality}|{model}] A1 leak: ref_manifest missing {key}"
                )
            pos = manifest[key]
            if not isinstance(pos, int) or pos < 1 or pos > len(refs):
                raise AssertionError(
                    f"[{modality}|{model}] A1 leak: ref_manifest[{key}]={pos} "
                    f"out of range 1..{len(refs)}"
                )
    # Prompt body must contain the @Image{N} token for every N.
    # R5 carry-over (2026-05-21)—CLI-surface payloads (shot_flag, shots_flag,
    # generate_cli, run_overnight_dry) don't compile a prompt; they exercise
    # routing/sidecar/tag derivation, not prompt structure. When the payload
    # lacks a prompt body, the @ImageN check is not applicable and we skip
    # gracefully. The BUILDERS-enumerated path (which DOES build a prompt)
    # still enforces the contract for sample_shots[0] under each modifier.
    prompt = payload.get("prompt") or ""
    if not prompt:
        return
    for n in range(1, n_chars + 1):
        token = f"@Image{n}"
        if token not in prompt:
            raise AssertionError(
                f"[{modality}|{model}] A1 leak: prompt missing {token} for "
                f"character {n} ({roster[n - 1]})"
            )


def _norm_quotes(s: str) -> str:
    """§R5 Gemini R1: normalize smart quotes → straight quotes so the
    `text in prompt` substring check is robust to plan-source quote drift.
    Plans often carry typographic curly quotes; builders emit straight.
    """
    return (s or "").replace("“", '"').replace("”", '"') \
                    .replace("‘", "'").replace("’", "'")


def assert_dialogue_clause_present(payload, modality, model, shot, **_) -> None:  # 24
    """A2 leak guard: when shot.has_dialogue is True AND
    shot.audio_data.dialogue is non-empty AND the first entry's text is
    non-empty, the dispatched prompt must contain that text as a quoted
    substring. R5 SYNTHESIS §1.3.

    Both build_seeddance_i2v_prompt and build_seeddance_r2v_prompt_multi
    emit the line as `The subject speaks: "<text>"`—assertion looks for
    the text substring (not the wrapping clause) so future phrasing
    refinements don't break the contract.

    §R5 Gemini R1: smart-quote normalization on both sides via
    _norm_quotes(...) so plan-source curly quotes don't falsely fail.

    Like #23, this assertion skips when the payload has no prompt body
    (CLI-surface fixtures exercise routing, not prompt content).
    """
    if modality not in {"video_i2v", "r2v_multi"}:
        return
    # For r2v_multi the per-segment check is by shot; the audit fixture
    # passes a single representative shot for assertion purposes.
    if shot is None:
        return
    routing = (shot or {}).get("routing_data") or {}
    has_dlg = bool(routing.get("has_dialogue") or shot.get("has_dialogue"))
    if not has_dlg:
        return
    audio = (shot or {}).get("audio_data") or {}
    dialogue_list = audio.get("dialogue") or []
    if not dialogue_list:
        return
    first = dialogue_list[0]
    if isinstance(first, dict):
        text = str(first.get("text") or "").strip()
    else:
        text = str(first).strip()
    text = text.strip('"').strip("'").strip()
    if not text:
        return  # nothing to assert against
    prompt = payload.get("prompt") or ""
    if not prompt:
        return  # CLI-surface fixture without compiled prompt; skip
    if _norm_quotes(text) not in _norm_quotes(prompt):
        raise AssertionError(
            f"[{modality}|{model}] A2 leak: dialogue text {text!r} not "
            f"present in prompt body. prompt[:200]={prompt[:200]!r}"
        )


def assert_no_orphan_boundary_frames(payload, modality, model, shot, **_) -> None:  # 25
    """B2 leak guard: post-fire production check. For every
    boundary_frames/{stem}_seg*.jpg on disk in the inspected output dir,
    the corresponding {stem}.mp4 must exist (the source video was not
    deleted without its companion seg frames). R5 SYNTHESIS §1.6.

    Soft-no-op when called outside an --inspect-sidecars run (no
    output_dir in payload). Phase 6 wires output_dir into the payload
    for production sidecar inspection.
    """
    output_dir_str = (payload or {}).get("_audit_output_dir")
    if not output_dir_str:
        return  # synthetic-fixture run; nothing to scan
    from pathlib import Path
    output_dir = Path(output_dir_str)
    if not output_dir.exists():
        return
    bf_dir = output_dir / "boundary_frames"
    if not bf_dir.exists():
        return
    orphans: list[str] = []
    for seg in bf_dir.glob("*_seg*.jpg"):
        # Strip "_segNN.jpg" suffix to derive the expected source video stem.
        # e.g. "EP001_SH10_take1_seg00.jpg" -> "EP001_SH10_take1"
        m = re.match(r"^(.+)_seg\d+\.jpg$", seg.name)
        if not m:
            continue
        stem = m.group(1)
        # Source video lives next to boundary_frames/, NOT inside it.
        source_mp4 = output_dir / f"{stem}.mp4"
        if not source_mp4.exists():
            orphans.append(seg.name)
    if orphans:
        raise AssertionError(
            f"[{modality}|{model}] B2 leak: {len(orphans)} orphan "
            f"boundary frames in {bf_dir}: {orphans[:5]}"
        )


def assert_segmentation_gated_on_video_success(payload, modality, model, shot, **_) -> None:  # 26
    """B3 leak guard: post-fire production check. For every
    boundary_frames/{stem}_seg*.jpg in the inspected output dir, the
    receipts.jsonl entry for the source video's pass_id must show
    ok=True. Soft-no-op when called outside --inspect-sidecars (no
    output_dir + receipts_log in payload). R5 SYNTHESIS §1.7 (B3 path b).

    Path-(b) framing: there is no real Gemini Flash segmentation pipeline
    in this codebase. Boundary frames are extracted by ffmpeg from the
    source video. This assertion still has value as a defense-in-depth
    invariant: a *_seg*.jpg should never persist on disk if its source
    video's dispatch receipt was a failure — when JT or the cleanup
    pipeline rejects the source video, the seg frames go with it. This
    assertion catches drift if that invariant breaks.
    """
    output_dir_str = (payload or {}).get("_audit_output_dir")
    receipts_path_str = (payload or {}).get("_audit_receipts_log")
    if not output_dir_str or not receipts_path_str:
        return
    from pathlib import Path
    output_dir = Path(output_dir_str)
    receipts_path = Path(receipts_path_str)
    bf_dir = output_dir / "boundary_frames"
    if not bf_dir.exists() or not receipts_path.exists():
        return
    # Build a {pass_id: ok} index from receipts. Last write wins per
    # pass_id (retries override earlier failures).
    ok_by_pass: dict = {}
    try:
        for line in receipts_path.read_text().splitlines():
            if not line.strip():
                continue
            entry = json.loads(line)
            pid = entry.get("pass_id")
            if pid:
                ok_by_pass[pid] = bool(entry.get("ok"))
    except (json.JSONDecodeError, OSError):
        return
    bad: list[str] = []
    for seg in bf_dir.glob("*_seg*.jpg"):
        m = re.match(r"^(.+)_seg\d+\.jpg$", seg.name)
        if not m:
            continue
        stem = m.group(1)
        # The pass_id is read from the source sidecar's provenance. The
        # source video lives next to boundary_frames/, NOT inside it.
        source_sidecar = output_dir / f"{stem}.mp4.json"
        if not source_sidecar.exists():
            bad.append(f"{seg.name} (no source sidecar)")
            continue
        try:
            sc = json.loads(source_sidecar.read_text())
            prov = sc.get("provenance") or {}
            pid = prov.get("pass_id") or prov.get("dispatch_pass_id")
            if pid and ok_by_pass.get(pid) is False:
                bad.append(f"{seg.name} (source pass {pid} ok=False)")
        except (json.JSONDecodeError, OSError):
            continue
    if bad:
        raise AssertionError(
            f"[{modality}|{model}] B3 leak: {len(bad)} seg frames from "
            f"failed video passes: {bad[:5]}"
        )


# ── R6 Phase 10 — audit assertion #27 ────────────────────────────


def assert_sidecar_housekeeping_fields_present(payload, modality, model, shot, **_) -> None:  # 27
    """R6 Phase 10: verify sidecars carry the housekeeping fields that
    `populate_sidecar` (post-R6) emits. Soft-no-op when called outside
    --inspect-sidecars (no `_audit_sidecar_dict` in payload).

    Required fields:
      Top-level:
        schema_version (str "1.0" or int 1 — both legal per on-disk diversity)
        source         ∈ {"pipeline", "manual_drop", "pass_extraction"}
        status         ∈ {"candidate", "pinned", "canonical", "archived"}
                       (matches workspace SIDECAR_VALID_STATUSES frozen contract;
                        per Opus M3 fix — does NOT include "approved"/"rejected")
        created_at     (str — ISO timestamp)
        updated_at     (str — ISO timestamp)
        lineage        (dict, possibly empty)
        notes          (str, possibly empty)
        tags           (list, possibly empty)
    """
    sc = (payload or {}).get("_audit_sidecar_dict")
    if not isinstance(sc, dict):
        return  # soft no-op (not called via --inspect-sidecars)

    # Import canonical frozen-contract constants from workspace (data-contracts.md §1a).
    # Importing inline avoids a module-level workspace dependency in this audit module.
    from recoil.workspace.sidecar import (
        SIDECAR_VALID_SOURCES,
        SIDECAR_VALID_STATUSES,
    )

    # schema_version: accept BOTH "1.0" string and 1 int (legacy on-disk diversity).
    VALID_SCHEMA_VERSIONS = {"1.0", 1}
    sv = sc.get("schema_version")
    if sv not in VALID_SCHEMA_VERSIONS:
        raise AssertionError(
            f"[{modality}|{model}] sidecar schema_version={sv!r} "
            f"not in {VALID_SCHEMA_VERSIONS} (housekeeping)"
        )

    # source enum — canonical workspace frozen contract.
    src = sc.get("source")
    if src not in SIDECAR_VALID_SOURCES:
        raise AssertionError(
            f"[{modality}|{model}] sidecar source={src!r} not in {set(SIDECAR_VALID_SOURCES)}"
        )

    # status enum — canonical workspace frozen contract (Opus M3 fix:
    # "approved"/"rejected" are NOT valid workspace statuses).
    st = sc.get("status")
    if st not in SIDECAR_VALID_STATUSES:
        raise AssertionError(
            f"[{modality}|{model}] sidecar status={st!r} not in {set(SIDECAR_VALID_STATUSES)}"
        )

    # Type checks (str/str/dict/str/list)
    if not isinstance(sc.get("created_at"), str):
        raise AssertionError(
            f"[{modality}|{model}] sidecar created_at is not str: "
            f"{type(sc.get('created_at')).__name__}"
        )
    if not isinstance(sc.get("updated_at"), str):
        raise AssertionError(
            f"[{modality}|{model}] sidecar updated_at is not str"
        )
    if not isinstance(sc.get("lineage"), dict):
        raise AssertionError(
            f"[{modality}|{model}] sidecar lineage is not dict"
        )
    if not isinstance(sc.get("notes"), str):
        raise AssertionError(
            f"[{modality}|{model}] sidecar notes is not str"
        )
    if not isinstance(sc.get("tags"), list):
        raise AssertionError(
            f"[{modality}|{model}] sidecar tags is not list"
        )


VIDEO_ASSERTIONS = (
    assert_no_exceptions,  # 1
    assert_reference_images_complete,  # 2
    assert_no_unhydrated_ref_tokens,  # 3
    assert_no_proper_nouns,  # 4
    assert_start_frame_serializable,  # 5
    assert_segment_timestamps_match,  # 6
    assert_canonical_filename,  # 7
    assert_model_not_unknown,  # 8
    assert_sidecar_populated,  # 9
    assert_audio_on_default_for_narrative,  # 10 (R4 inverted)
    assert_payload_json_serializable,  # 11
    assert_refs_exist_and_min_resolution,  # 12
    assert_budget_preflight_ok,  # 13
    assert_focal_length_opt_in,  # 14
    # R4 additions (CLI-surface assertions):
    assert_single_shot_tag_derivation,  # 17
    assert_single_shot_sidecar_populated,  # 18
    assert_shots_routes_to_r2v_multi,  # 19
    assert_every_segment_writes_sidecar,  # 20
    assert_generate_cli_dispatches,  # 21
    assert_run_overnight_dry_run_completes,  # 22
    # R5 additions (production-leak assertions):
    assert_multi_char_refs_hydrated,  # 23
    assert_dialogue_clause_present,  # 24
    assert_no_orphan_boundary_frames,  # 25
    assert_segmentation_gated_on_video_success,  # 26
    # R6 Phase 10 addition (sidecar housekeeping):
    assert_sidecar_housekeeping_fields_present,  # 27
)


def run_all_assertions(payload: dict, modality: str, model: str, kwargs: dict) -> None:
    shot = kwargs.get("shot")
    if shot is None:
        shots = kwargs.get("shots") or []
        shot = shots[0] if shots else None
    if modality == "audio_t2a":
        assert_audio_t2a_payload_valid(payload, modality, model, shot)
        return
    if modality == "lipsync_post":
        assert_lipsync_post_payload_valid(payload, modality, model, shot)
        return
    for assertion in VIDEO_ASSERTIONS:
        assertion(payload, modality, model, shot, kwargs=kwargs)
