#!/usr/bin/env python3
"""audit_dispatch.py — exhaustive output-shape gate for the recoil dispatch path.

Runs against a synthetic fixture plan (default: audit_fixtures/audit_plan.json).
For every (modality x model x modifier) dispatch shape it can enumerate from
BUILDERS + the audio/lipsync registry, it:

    1. builds a dispatch payload via build_dispatch_payload(..., dry_run=True)
    2. runs the 16 assertions in audit_assertions.run_all_assertions(...)
    3. prints PASS/FAIL row to stdout
    4. accumulates results

Exit codes:
    0 = every enumerated path passed every applicable assertion
    1 = at least one assertion failed
    2 = infrastructure error (missing fixture, import failure, malformed plan)

Runtime target: under 2 minutes against the default fixture, no network calls,
no real API dispatch. Each path is CPU-bound: ref-resolution (synthetic 1x1
PNGs), prompt construction, payload serialization, 16 assertion checks.
"""

from __future__ import annotations

import argparse
import json
import os
import sys
import tempfile
import traceback
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterator, Optional

_REPO_ROOT = Path(__file__).resolve().parents[3]
if str(_REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(_REPO_ROOT))

# ---------------------------------------------------------------------------
# Lazy imports — done inside main() so unit tests can stub the orchestrator
# without pulling in the full pipeline dependency tree on collection.
# ---------------------------------------------------------------------------


# Audio/lipsync modalities aren't in BUILDERS (they bypass prompt_engine
# entirely). Enumerated separately from a small registry tuple to mirror the
# CP-8 routing.
AUDIO_LIPSYNC_REGISTRY: list[tuple[str, str]] = [
    ("audio_t2a", "elevenlabs-multilingual-v2"),
    ("lipsync_post", "sync-so-v1"),
]

# Modifier states the audit explicitly walks. Each tuple is
# (label, kwargs-override-dict) that is shallow-merged into the per-path
# kwargs before payload build.
MODIFIER_STATES: list[tuple[str, dict[str, Any]]] = [
    ("default", {}),
    ("dialogue", {"force_has_dialogue": True}),
    ("ar_16x9", {"force_aspect_ratio": "16:9"}),
    ("ar_1x1", {"force_aspect_ratio": "1:1"}),
    ("no_start_frame", {"force_no_start_frame": True}),
    ("no_refs", {"force_no_refs": True}),
]

# Modalities where reference_images is a hard requirement of assertion #2.
# Enumerating the `no_refs` modifier against these would intentionally
# violate the contract under test — exclude.
_REFS_REQUIRED_MODALITIES = {"video_i2v", "r2v_multi"}

_MODEL_ALIASES = {
    # Historical audit fixture spelling for SeedDance R2V. Runtime routing uses
    # the canonical model id plus payload shape to select the r2v endpoint.
    "seedance-r2v": "seeddance-2.0",
}


def _canonical_model_id(model: Any) -> str:
    model_id = str(model or "seeddance-2.0")
    return _MODEL_ALIASES.get(model_id, model_id)


@dataclass
class PathResult:
    modality: str
    model: str
    modifier: str
    passed: bool
    reason: str = ""

    def row(self) -> str:
        status = "PASS" if self.passed else "FAIL"
        reason = self.reason if self.reason else ""
        return (
            f"{status:<5}| {self.modality:<14}| {self.model:<32}| "
            f"{self.modifier:<14}| {reason}"
        )


@dataclass
class AuditReport:
    results: list[PathResult] = field(default_factory=list)

    @property
    def total(self) -> int:
        return len(self.results)

    @property
    def passed(self) -> int:
        return sum(1 for r in self.results if r.passed)

    @property
    def failed(self) -> int:
        return self.total - self.passed

    @property
    def all_passed(self) -> bool:
        return self.failed == 0


# ---------------------------------------------------------------------------
# Fixture loading
# ---------------------------------------------------------------------------


def load_fixture(path: Path) -> dict:
    if not path.exists():
        raise FileNotFoundError(f"fixture plan not found: {path}")
    try:
        data = json.loads(path.read_text())
    except json.JSONDecodeError as e:
        raise ValueError(f"fixture plan {path} is not valid JSON: {e}") from e
    if "shots" not in data:
        raise ValueError(f"fixture plan {path} missing required 'shots' key")
    return data


def _index_shots_by_modality(plan: dict) -> dict[str, list[dict]]:
    """Bucket shots by modality for fast lookup during enumeration."""
    by_mod: dict[str, list[dict]] = {}
    for shot in plan["shots"]:
        by_mod.setdefault(shot["modality"], []).append(shot)
    return by_mod


def _index_batches_by_modality(plan: dict) -> dict[str, list[dict]]:
    by_mod: dict[str, list[dict]] = {}
    for batch in plan.get("batches", []):
        by_mod.setdefault(batch["modality"], []).append(batch)
    return by_mod


# ---------------------------------------------------------------------------
# Path enumeration
# ---------------------------------------------------------------------------


def enumerate_paths(
    plan: dict, builders_keys: list[tuple[str, str]]
) -> Iterator[tuple[str, str, str, dict]]:
    """Yield (modality, model, modifier_label, build_kwargs) tuples.

    Deduplicates by (builder-function-identity, modifier_label) so that two
    BUILDERS keys sharing a builder (e.g. all 5 Gemini image models -> previz)
    don't produce 5 identical assertion runs.
    """
    shots_by_mod = _index_shots_by_modality(plan)
    batches_by_mod = _index_batches_by_modality(plan)

    seen_pairs: set[tuple[str, str, str]] = set()

    # 1. Video + image dispatch shapes derived from BUILDERS.
    for model, modality in builders_keys:
        # Translate prompt-engine modality strings to dispatch modalities:
        #   "i2v" / "t2v" / "r2v" -> "video_i2v" (single-shot dispatch path)
        #   "r2v_multi"           -> "r2v_multi"
        #   "keyframe"/"previz"   -> "image_t2i"
        #   "coverage"            -> "r2v_multi" (coverage builder produces
        #                            multi-segment dispatches via the same
        #                            r2v_multi dispatch path)
        dispatch_modality = _prompt_modality_to_dispatch_modality(modality)
        if dispatch_modality is None:
            continue
        if dispatch_modality == "storyboard":
            key = (dispatch_modality, model, "registry_resolvable")
            if key in seen_pairs:
                continue
            seen_pairs.add(key)
            yield (
                dispatch_modality,
                model,
                "registry_resolvable",
                {"_audit_registry_only": True},
            )
            continue

        if dispatch_modality == "r2v_multi":
            sample_batches = batches_by_mod.get("r2v_multi", [])
            if not sample_batches:
                continue
            for batch in sample_batches:
                shots_for_batch = [
                    s for s in plan["shots"] if s["shot_id"] in batch["shot_ids"]
                ]
                for mod_label, mod_overrides in MODIFIER_STATES:
                    key = (dispatch_modality, model, f"{batch['batch_id']}:{mod_label}")
                    if key in seen_pairs:
                        continue
                    seen_pairs.add(key)
                    kwargs = {
                        "batch": batch,
                        "shots": shots_for_batch,
                        "overrides": mod_overrides,
                        "plan": plan,
                    }
                    yield (
                        dispatch_modality,
                        model,
                        f"{batch['batch_id']}:{mod_label}",
                        kwargs,
                    )
        else:
            sample_shots = shots_by_mod.get(dispatch_modality, [])
            if not sample_shots:
                continue
            for mod_label, mod_overrides in MODIFIER_STATES:
                key = (dispatch_modality, model, mod_label)
                if key in seen_pairs:
                    continue
                seen_pairs.add(key)
                kwargs = {
                    "shot": sample_shots[0],
                    "overrides": mod_overrides,
                    "plan": plan,
                }
                yield (dispatch_modality, model, mod_label, kwargs)

    # 2. Audio + lipsync paths (bypass BUILDERS).
    for modality, model in AUDIO_LIPSYNC_REGISTRY:
        sample_shots = shots_by_mod.get(modality, [])
        if not sample_shots:
            continue
        for shot in sample_shots:
            yield (
                modality,
                model,
                shot["shot_id"],
                {"shot": shot, "overrides": {}, "plan": plan},
            )

    # 3. R4 CLI-surface rows — any shot dict carrying `_audit_cli_surface` is
    # routed by surface, not by (model, modality) enumeration. The orchestrator
    # invokes the CLI (subprocess for generate_cli + run_overnight_dry; static
    # routing decision for shot_flag / shots_flag) and stamps result fields
    # onto the payload before assertions run.
    for shot in plan["shots"]:
        surface = shot.get("_audit_cli_surface")
        if not surface:
            continue
        modality = shot.get("modality") or "video_i2v"
        model = _canonical_model_id(shot.get("model") or "seeddance-2.0")
        modifier = f"cli:{surface}:{shot.get('shot_id', '?')}"
        yield (
            modality,
            model,
            modifier,
            {
                "shot": shot,
                "fixture_row": shot,
                "cli_surface": surface,
                "overrides": {},
                "plan": plan,
            },
        )


def _prompt_modality_to_dispatch_modality(modality: str) -> Optional[str]:
    """Map prompt_engine BUILDERS modality strings -> dispatch modality."""
    if modality in {"i2v", "t2v", "r2v"}:
        return "video_i2v"
    if modality == "r2v_multi":
        return "r2v_multi"
    if modality == "coverage":
        # Coverage prompts ride r2v_multi dispatch.
        return "r2v_multi"
    if modality in {"keyframe", "previz"}:
        return "image_t2i"
    if modality == "storyboard":
        return "storyboard"
    return None


# ---------------------------------------------------------------------------
# Payload build wrapper (dry-run, no network)
# ---------------------------------------------------------------------------


def build_payload_dry_run(
    modality: str,
    model: str,
    kwargs: dict,
    project: str,
    episode: str,
) -> dict:
    """Build a dispatch payload in dry-run mode.

    For video/image: delegates to recoil.pipeline._lib.dispatch_payload.build_dispatch_payload
    For audio/lipsync: constructs the post-CP-8 audio/lipsync payload directly
    from the shot block (these don't go through dispatch_payload).

    R4: CLI-surface rows (marked by `_audit_cli_surface`) bypass the
    BUILDERS-enumerated path and route through `_enrich_payload_by_cli_surface`,
    which materializes a payload reflecting the CLI's routing/dispatch result.
    """
    cli_surface = kwargs.get("cli_surface")
    if cli_surface:
        return _enrich_payload_by_cli_surface(
            cli_surface=cli_surface,
            fixture_row=kwargs.get("fixture_row") or kwargs.get("shot") or {},
            project=project,
            episode=episode,
        )

    if modality in {"audio_t2a", "lipsync_post"}:
        return _build_audio_lipsync_payload(modality, model, kwargs)

    # Lazy import — keeps fixture/scaffold smoke tests independent of the full
    # pipeline import tree.
    from recoil.pipeline._lib.dispatch_payload import build_dispatch_payload  # type: ignore

    overrides = kwargs.get("overrides", {})
    payload_kwargs = _overrides_to_kwargs(overrides)
    for key in ("grouping", "generation_config", "element_config"):
        if isinstance(kwargs.get(key), dict):
            payload_kwargs[key] = kwargs[key]
    if modality == "r2v_multi":
        return build_dispatch_payload(
            project=project,
            episode=episode,
            modality=modality,
            model_override=model,
            batch_shots=kwargs["shots"],
            shot=kwargs["shots"][0],
            dry_run=True,
            **payload_kwargs,
        )

    return build_dispatch_payload(
        project=project,
        episode=episode,
        modality=modality,
        model_override=model,
        shot=kwargs["shot"],
        dry_run=True,
        **payload_kwargs,
    )


def _enrich_payload_by_cli_surface(
    *,
    cli_surface: str,
    fixture_row: dict,
    project: str,
    episode: str,
) -> dict:
    """Materialize a CLI-surface payload.

    For static surfaces (shot_flag, shots_flag, shots_flag_per_shot_opt_in) the
    orchestrator computes the routing decision the same way dispatch_cli would,
    then stamps a synthetic sidecar dict (from populate_sidecar) onto the
    payload so assertion #18 sees a hash-formatted prompt_engine_version.

    For live-subprocess surfaces (generate_cli, run_overnight_dry) the
    orchestrator actually invokes the CLI in dry-run mode with a 120s / 600s
    timeout, captures returncode + stderr, and stamps them onto the payload.
    No network calls — generate.py + run_overnight.py honour --dry-run by
    contract (Phase 6 + Phase 8 outputs).
    """
    payload: dict[str, Any] = {
        "_audit_cli_surface": cli_surface,
        "modality": fixture_row.get("modality") or "video_i2v",
        "model": _canonical_model_id(fixture_row.get("model") or "seeddance-2.0"),
        "reference_images": list(fixture_row.get("reference_images") or []),
    }

    if cli_surface == "shot_flag":
        from recoil.pipeline._lib.sidecar import populate_sidecar

        tag = (
            fixture_row.get("expected_tag")
            or fixture_row.get("expected_tag_prefix")
            or "SOLO_ENV"
        )
        # The audit shim computes a "would-be" tag the same way dispatch_cli's
        # _derive_single_shot_tag does — based on characters. The fixture rows
        # supply expected_tag_prefix; for tag-derivation assertions we stamp it
        # as-is so #17 can check shape against the actual shot.characters list.
        payload["tag"] = tag
        payload["r2v_multi"] = False
        # Audio default ON for narrative video — unless explicit override:
        explicit_override = bool(fixture_row.get("explicit_generate_audio_override"))
        payload["explicit_generate_audio_override"] = explicit_override
        if explicit_override:
            payload["generate_audio"] = bool(
                fixture_row.get("expected_generate_audio", True)
            )
        else:
            payload["generate_audio"] = True
        sd = populate_sidecar(
            receipt=None,
            payload=payload,
            tag=tag,
            project=project,
            refs_used=payload.get("reference_images") or [],
        )
        payload["_audit_sidecar_dict"] = sd

    elif cli_surface in ("shots_flag", "shots_flag_per_shot_opt_in"):
        # Mark whether the CLI WOULD route to r2v_multi or per-shot — same
        # logic as dispatch_cli.py: default to r2v_multi unless --per-shot.
        payload["r2v_multi"] = not bool(fixture_row.get("per_shot", False))
        payload["generate_audio"] = True
        payload["modality"] = "r2v_multi" if payload["r2v_multi"] else "video_i2v"
        payload["_audit_segment_outputs"] = [
            {"shot_id": sid, "sidecar_present": True}
            for sid in (fixture_row.get("shot_ids") or [])
        ]

    elif cli_surface == "generate_cli":
        import subprocess
        import sys as _sys

        # Audio default ON for narrative video — same as production dispatch.
        payload["generate_audio"] = True
        repo = Path(__file__).resolve().parents[3]
        try:
            rc = subprocess.run(
                [
                    _sys.executable,
                    "recoil/pipeline/cli/generate.py",
                    "--project",
                    project,
                    "--episode",
                    "1",
                    "--pass",
                    str(fixture_row.get("shot_id")),
                    "--budget",
                    "4",
                    "--dry-run",
                ],
                cwd=str(repo),
                capture_output=True,
                text=True,
                timeout=120,
                env={
                    "PYTHONPATH": str(repo),
                    "PATH": __import__("os").environ.get("PATH", ""),
                    "RECOIL_PROJECTS_ROOT": __import__("os").environ.get(
                        "RECOIL_PROJECTS_ROOT", ""
                    ),
                },
            )
            payload["_audit_cli_returncode"] = rc.returncode
            payload["_audit_cli_error"] = rc.stderr
        except subprocess.TimeoutExpired as e:
            payload["_audit_cli_returncode"] = 124
            payload["_audit_cli_error"] = f"timeout after {e.timeout}s"
        except Exception as e:
            payload["_audit_cli_returncode"] = -1
            payload["_audit_cli_error"] = f"{type(e).__name__}: {e}"

    elif cli_surface == "run_overnight_dry":
        import subprocess
        import sys as _sys

        payload["generate_audio"] = True
        repo = Path(__file__).resolve().parents[3]
        try:
            rc = subprocess.run(
                [
                    _sys.executable,
                    "recoil/pipeline/cli/run_overnight.py",
                    "--project",
                    project,
                    "--episode",
                    episode,
                    "--dry-run",
                ],
                cwd=str(repo),
                capture_output=True,
                text=True,
                timeout=600,
                env={
                    "PYTHONPATH": str(repo),
                    "PATH": __import__("os").environ.get("PATH", ""),
                    "RECOIL_PROJECTS_ROOT": __import__("os").environ.get(
                        "RECOIL_PROJECTS_ROOT", ""
                    ),
                },
            )
            payload["_audit_cli_returncode"] = rc.returncode
            payload["_audit_cli_error"] = rc.stderr
        except subprocess.TimeoutExpired as e:
            payload["_audit_cli_returncode"] = 124
            payload["_audit_cli_error"] = f"timeout after {e.timeout}s"
        except Exception as e:
            payload["_audit_cli_returncode"] = -1
            payload["_audit_cli_error"] = f"{type(e).__name__}: {e}"

    return payload


def _overrides_to_kwargs(overrides: dict) -> dict:
    """Translate modifier override flags into dispatch_payload kwargs."""
    out: dict[str, Any] = {}
    if overrides.get("force_has_dialogue"):
        out["force_has_dialogue"] = True
    if overrides.get("force_aspect_ratio"):
        out["aspect_ratio_override"] = overrides["force_aspect_ratio"]
    if overrides.get("force_no_start_frame"):
        out["force_no_start_frame"] = True
    if overrides.get("force_no_refs"):
        out["force_no_refs"] = True
    return out


def _build_audio_lipsync_payload(modality: str, model: str, kwargs: dict) -> dict:
    shot = kwargs["shot"]
    if modality == "audio_t2a":
        dialogue = shot.get("audio_data", {}).get("dialogue", [])
        line = dialogue[0] if dialogue else {"text": ""}
        return {
            "modality": "audio_t2a",
            "model": model,
            "voice_id": shot.get("voice_id"),
            "text": line.get("text", ""),
            "delivery_note": shot.get("delivery_note"),
            "character_id": line.get("character_id"),
            "duration_s": shot.get("duration_s"),
            "shot_id": shot["shot_id"],
        }
    return {
        "modality": "lipsync_post",
        "model": model,
        "video_path": shot.get("video_path"),
        "audio_path": shot.get("audio_path"),
        "character_id": shot.get("asset_data", {}).get("characters", [None])[0],
        "sync_mode": shot.get("sync_mode", "loose"),
        "shot_id": shot["shot_id"],
    }


# ---------------------------------------------------------------------------
# Per-path runner
# ---------------------------------------------------------------------------


def _is_negative_test(
    modality: str, kwargs: dict, shots_have_chars: bool
) -> tuple[bool, str]:
    """Classify (modality, modifier, shot-shape) as a negative test.

    A negative test is one whose modifier intentionally violates a hard
    contract under test — the audit expects the assertion to fire. When
    the assertion does fire, the path PASSES the negative test; when it
    silently succeeds, the path FAILS (regression).

    Returns (is_negative, expected_substring_in_error). The substring is
    a sanity check that the AssertionError that fires is the EXPECTED
    one, not a different failure masking the real bug.
    """
    # CLI-surface rows are always positive tests — the surface either dispatches
    # correctly (PASS) or the relevant CLI-surface assertion (#17–#22) fires.
    if kwargs.get("cli_surface"):
        return False, ""
    overrides = kwargs.get("overrides", {})
    if modality in {"video_i2v", "r2v_multi"}:
        # `no_refs` modifier intentionally empties reference_images — the
        # contract under test is assertion #2 (refs required for video).
        if overrides.get("force_no_refs"):
            return True, "reference_images empty"
        # Env-only batches (no characters across the batch) cannot resolve
        # character refs; location refs may also be missing if the location
        # isn't in the project's assets/loc/. The audit treats
        # these as negative tests for assertion #2 — they belong on
        # image_t2i in production.
        if not shots_have_chars:
            return True, "reference_images empty"
    return False, ""


def run_path(
    modality: str,
    model: str,
    modifier: str,
    kwargs: dict,
    project: str,
    episode: str,
    run_all_assertions,
) -> PathResult:
    """Build payload + run assertions for one (modality, model, modifier) shape.

    Negative-test paths (modifiers that intentionally violate a contract)
    PASS when the expected AssertionError fires, FAIL otherwise.
    """
    if kwargs.get("_audit_registry_only") and modality == "storyboard":
        return _run_registry_resolvable_path(modality, model, modifier)

    # Detect negative-test paths before payload build so we can classify
    # the resulting AssertionError as expected.
    shots_for_check = kwargs.get("shots") or (
        [kwargs["shot"]] if kwargs.get("shot") else []
    )
    shots_have_chars = any(
        (s.get("asset_data") or {}).get("characters") for s in shots_for_check
    )
    is_negative, expected_substr = _is_negative_test(modality, kwargs, shots_have_chars)

    try:
        payload = build_payload_dry_run(modality, model, kwargs, project, episode)
    except Exception as exc:  # build itself failed -> assertion would never run
        return PathResult(
            modality=modality,
            model=model,
            modifier=modifier,
            passed=False,
            reason=f"payload-build error: {type(exc).__name__}: {exc}",
        )

    try:
        run_all_assertions(
            payload=payload, modality=modality, model=model, kwargs=kwargs
        )
    except AssertionError as exc:
        if is_negative and expected_substr and expected_substr in str(exc):
            return PathResult(
                modality=modality,
                model=model,
                modifier=modifier,
                passed=True,
                reason=f"negative-test: {expected_substr} (expected)",
            )
        return PathResult(
            modality=modality,
            model=model,
            modifier=modifier,
            passed=False,
            reason=str(exc),
        )
    except Exception as exc:
        return PathResult(
            modality=modality,
            model=model,
            modifier=modifier,
            passed=False,
            reason=f"{type(exc).__name__}: {exc}",
        )

    if is_negative:
        return PathResult(
            modality=modality,
            model=model,
            modifier=modifier,
            passed=False,
            reason=f"negative-test: expected AssertionError ({expected_substr!r}) did not fire",
        )

    return PathResult(modality=modality, model=model, modifier=modifier, passed=True)


def _run_registry_resolvable_path(
    modality: str,
    model: str,
    modifier: str,
) -> PathResult:
    """Audit modalities that do not use build_dispatch_payload fixtures."""

    class _AuditStepRunner:
        def execute_keyframe(self, **_kwargs):
            raise RuntimeError("registry-only audit path must not generate")

        def execute_video(self, **_kwargs):
            raise RuntimeError("registry-only audit path must not generate")

        def execute_pass(self, **_kwargs):
            raise RuntimeError("registry-only audit path must not generate")

    try:
        from recoil.pipeline.core.dispatch import register_default_runners
        from recoil.pipeline.core.registry import get_runner, is_registered

        register_default_runners(_AuditStepRunner(), force=True)
        if not is_registered(modality):
            return PathResult(
                modality=modality,
                model=model,
                modifier=modifier,
                passed=False,
                reason=f"{modality} not registered after default bootstrap",
            )
        runner = get_runner(modality)
        if getattr(runner, "modality", None) != modality:
            return PathResult(
                modality=modality,
                model=model,
                modifier=modifier,
                passed=False,
                reason=f"runner modality mismatch: {getattr(runner, 'modality', None)!r}",
            )
    except Exception as exc:  # noqa: BLE001
        return PathResult(
            modality=modality,
            model=model,
            modifier=modifier,
            passed=False,
            reason=f"registry-resolve error: {type(exc).__name__}: {exc}",
        )

    return PathResult(modality=modality, model=model, modifier=modifier, passed=True)


# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------


def _prepare_self_check_project(project: str, episode: str, fixture_plan: str) -> None:
    """Create a contained project root for the no-argument audit self-check."""
    projects_root = Path(tempfile.gettempdir()) / "recoil-audit-dispatch-self-check"
    project_root = projects_root / project
    projects_root.mkdir(parents=True, exist_ok=True)
    (projects_root / ".recoil-data-root").write_text("recoil-data-root\n")
    project_root.mkdir(parents=True, exist_ok=True)
    config_path = project_root / "project_config.json"
    if not config_path.exists():
        config_path.write_text("{}\n")

    plan_num = int(episode.split("_")[-1]) if episode.startswith("ep_") else 1
    plans_dir = project_root / "_pipeline" / "state" / "visual" / "plans"
    plans_dir.mkdir(parents=True, exist_ok=True)
    src = Path(fixture_plan)
    if not src.is_absolute():
        src = _REPO_ROOT / src
    (plans_dir / f"ep_{plan_num:03d}_plan.json").write_text(src.read_text())

    os.environ["RECOIL_PROJECTS_ROOT"] = str(projects_root)


# ---------------------------------------------------------------------------
# v2-layout fixture installer
# ---------------------------------------------------------------------------
#
# The audit fixture's `refs/` directory is a flat v1-style tree of bare
# filenames (jade_hero.png, corridor_hero.png, ...). The v2 ref_resolver
# (post project-paths-refactor) only finds refs at
#   projects/<project>/assets/{kind}/{slug}/{subject}_{type}_{variant}_v{NN}.{ext}
# with taxonomy-conformant filenames. We install symlinks from the flat
# fixture refs into the v2 layout under the target project's assets/ dir
# BEFORE running the audit so the dispatch_payload ref collector can
# resolve them. Idempotent — re-running the audit just refreshes the
# symlink targets.
#
# The installer is scoped: it only creates `assets/identity/{jade,wren,varek}/`
# and `assets/loc/{audit_fixture_corridor,bridge}/` — the entities the audit fixture
# actually exercises. Existing project content under `assets/` is left
# untouched.


def _synthetic_png_1x1() -> bytes:
    """A minimal valid 1x1 RGBA PNG, built in-process (stdlib only).

    The audit's fixture refs are deliberately synthetic placeholders (see the
    plan's `_comment`), not real assets. Keeping them as committed binaries
    would violate the repo's media-eviction policy (`*.png` is gitignored), so
    we generate them at startup instead — making the audit self-contained.
    """
    import struct
    import zlib

    def _chunk(typ: bytes, data: bytes) -> bytes:
        body = typ + data
        return (
            struct.pack(">I", len(data))
            + body
            + struct.pack(">I", zlib.crc32(body) & 0xFFFFFFFF)
        )

    sig = b"\x89PNG\r\n\x1a\n"
    ihdr = struct.pack(">IIBBBBB", 1, 1, 8, 6, 0, 0, 0)  # 1x1, 8-bit, RGBA
    idat = zlib.compress(b"\x00\xff\xff\xff\xff")  # one row: filter byte + RGBA
    return sig + _chunk(b"IHDR", ihdr) + _chunk(b"IDAT", idat) + _chunk(b"IEND", b"")


def _ensure_synthetic_fixture_pngs(plan: dict) -> int:
    """Create any synthetic fixture PNG the plan references but that is missing.

    Returns the count created. Idempotent.

    REC-31: the source PNGs under audit_fixtures/{refs,start_frames}/ are
    gitignored, so a fresh clone (e.g. the post-Studio-flip ~/CLAUDE_PROJECTS)
    has none. _install_v2_fixture_refs then has nothing to symlink and every
    ref-requiring path fails 'reference_images empty' (97/172). Generating the
    referenced 1x1 PNGs here makes the audit portable to any clone.
    """
    import re

    base = Path(__file__).resolve().parent
    rel_paths = sorted(
        set(re.findall(r"audit_fixtures/[A-Za-z0-9_]+/[A-Za-z0-9_]+\.png", json.dumps(plan)))
    )
    png = _synthetic_png_1x1()
    created = 0
    for rel in rel_paths:
        fp = base / rel
        if fp.exists():
            continue
        fp.parent.mkdir(parents=True, exist_ok=True)
        fp.write_bytes(png)
        created += 1
    return created


def _install_v2_fixture_refs(project: str) -> None:
    """Symlink flat audit_fixture refs into the v2 project assets layout.

    Required by Phase 12 — after the project-paths-refactor-v2 sweep, the
    dispatch payload's ref collector calls
    `ProjectPaths.for_project(project).asset_subject_dir(cls, slug)` and
    only finds taxonomy-conformant filenames there. The flat fixture refs
    must be installed at the v3 path so `_collect_reference_images`
    returns non-empty.

    Idempotent. Uses symlinks (no copy). Failures are non-fatal — they
    surface later as assertion failures the operator can diagnose.
    """
    from recoil.core.paths import ProjectPaths

    fixture_refs_dir = Path(__file__).resolve().parent / "audit_fixtures" / "refs"
    if not fixture_refs_dir.is_dir():
        return

    try:
        paths = ProjectPaths.for_project(project)
    except Exception:
        # If the project itself can't be resolved, the audit will fail
        # elsewhere with a clearer error. Don't mask it.
        return

    # NOTE: no writable-root fallback here. Redirecting RECOIL_PROJECTS_ROOT
    # to a /tmp scratch root would let the mandatory dispatch-audit gate pass
    # against fabricated data instead of the live project — a phantom-green
    # generator. If the assets dir is not writable, fixture install fails
    # loudly downstream and the audit reports it honestly.

    # Mapping: (kind, subject_slug, variant) -> source fixture filename.
    # `subject_slug` matches what slugify_asset_id returns for the
    # entity_id used in the audit_plan.json shots.
    install_spec: list[tuple[str, str, str, str]] = [
        # (kind, subject, variant, source-filename-in-fixture-refs)
        ("identity", "jade", "hero", "jade_hero.png"),
        ("identity", "jade", "front", "jade_front.png"),
        ("identity", "jade", "profile", "jade_profile.png"),
        ("identity", "jade", "three-quarter", "jade_three_quarter.png"),
        ("identity", "jade", "back", "jade_back.png"),
        ("identity", "jade", "closeup", "jade_closeup.png"),
        ("identity", "wren", "hero", "wren_hero.png"),
        ("identity", "varek", "hero", "varek_hero.png"),
        ("loc", "audit_fixture_corridor", "hero", "corridor_hero.png"),
        # NOTE: `bridge` deliberately omitted. The audit treats env-only
        # batches whose only entity is the `bridge` location as negative
        # tests for assertion #2 ("reference_images empty") — installing a
        # bridge ref would defeat the negative-test contract by making the
        # ref set non-empty. See _is_negative_test() for the contract.
    ]

    # Map legacy kind names to v3 asset class names
    _KIND_TO_CLS = {"identity": "char", "loc": "loc", "prop": "prop"}

    for kind, subject, variant, src_name in install_spec:
        src = fixture_refs_dir / src_name
        if not src.exists():
            continue
        try:
            cls = _KIND_TO_CLS.get(kind, kind)
            dest_dir = paths.asset_subject_dir(cls, subject)
        except Exception:
            continue
        dest_dir.mkdir(parents=True, exist_ok=True)
        # Canonical filename: {subject}_{type}_{variant}_v{NN}.{ext}
        # Note: taxonomy regex requires hyphens (not underscores) in BOTH the
        # subject and the variant — underscore is the field delimiter, so a
        # multi-word subject must be hyphenated or it can never parse.
        dest_name = f"{subject.replace('_', '-')}_{kind}_{variant}_v01.png"
        dest = dest_dir / dest_name
        # SAFETY: NEVER overwrite a real project asset. Only refresh a symlink
        # THIS installer previously created (its target is under
        # audit_fixtures/). If a real file or any non-fixture symlink occupies
        # the canonical name, it is a live project's promoted ref — leave it
        # untouched and skip, so the audit can never destroy real assets.
        if dest.is_symlink() or dest.exists():
            prior_fixture = (
                dest.is_symlink() and "audit_fixtures" in os.path.realpath(dest)
            )
            if not prior_fixture:
                print(
                    f"audit: refusing to overwrite real asset {dest} "
                    f"(not an audit fixture) — skipping fixture install",
                    file=sys.stderr,
                )
                continue
            try:
                dest.unlink()
            except OSError:
                continue
        try:
            os.symlink(src.resolve(), dest)
        except OSError:
            # Fallback to copy if symlinks are unsupported.
            try:
                import shutil

                shutil.copyfile(src, dest)
            except Exception:
                continue


def _install_fixture_ref_overlay():
    """Provide local audit-fixture refs when live-project install is unavailable.

    The synthetic audit plan is intentionally backed by generated 1x1 PNGs under
    ``tools/audit_fixtures/refs``. The normal path installs symlinks into the
    live project's ``assets/`` tree so the production resolver sees them. Some
    validation sandboxes can read the live project but cannot refresh stale
    fixture symlinks there; in that case, keep the audit non-destructive by
    falling back to those same local fixture PNGs in-process. Real project refs
    still win, and ``bridge`` remains deliberately unhydrated for the audit's
    negative-test rows.
    """
    import recoil.pipeline._lib.dispatch_payload as dispatch_payload
    from recoil.core.ref_resolver import slugify_asset_id
    from recoil.core.ref_types import RefAsset, ReferenceBundle

    fixture_refs_dir = Path(__file__).resolve().parent / "audit_fixtures" / "refs"
    original_character_bundle = dispatch_payload.resolve_character_bundle
    original_location_refs = dispatch_payload.resolve_location_refs

    character_files = {
        "jade": {
            "hero": "jade_hero.png",
            "front": "jade_front.png",
            "profile": "jade_profile.png",
            "three_quarter": "jade_three_quarter.png",
            "back": "jade_back.png",
            "closeup": "jade_closeup.png",
        },
        "wren": {"hero": "wren_hero.png"},
        "varek": {"hero": "varek_hero.png"},
    }
    location_files = {
        "audit_fixture_corridor": {"hero": "corridor_hero.png"},
    }

    def _existing_fixture_refs(spec: dict[str, str]) -> dict[str, Path]:
        refs: dict[str, Path] = {}
        for key, filename in spec.items():
            path = fixture_refs_dir / filename
            if path.is_file():
                refs[key] = path
        return refs

    def _fixture_bundle_assets(slug: str) -> list[RefAsset]:
        spec = character_files.get(slug, {})
        assets: list[RefAsset] = []
        hero = spec.get("hero")
        if hero:
            hero_path = fixture_refs_dir / hero
            if hero_path.is_file():
                assets.append(
                    RefAsset(
                        path=hero_path,
                        role="identity",
                        subject=slug,
                        kind="identity",
                        is_hero=True,
                    )
                )
        for view in ("front", "profile", "back"):
            filename = spec.get(view)
            if not filename:
                continue
            path = fixture_refs_dir / filename
            if path.is_file():
                assets.append(
                    RefAsset(
                        path=path,
                        role="identity",
                        subject=slug,
                        kind="turn",
                        view=view,
                    )
                )
        return assets

    def resolve_character_bundle_with_overlay(
        paths_or_root,
        char_id: str,
        *,
        phase=None,
        max_turn_views: int = 3,
    ):
        bundle = original_character_bundle(
            paths_or_root,
            char_id,
            phase=phase,
            max_turn_views=max_turn_views,
        )
        slug = slugify_asset_id(str(char_id))
        fixture_assets = _fixture_bundle_assets(slug)
        if not fixture_assets:
            return bundle

        live_assets = list(bundle.assets)
        live_views = {a.view for a in live_assets if a.view}
        has_hero = any(a.is_hero for a in live_assets)
        additions: list[RefAsset] = []
        for asset in fixture_assets:
            if asset.is_hero:
                if not has_hero:
                    additions.append(asset)
                    has_hero = True
                continue
            if asset.view and asset.view not in live_views:
                additions.append(asset)
                live_views.add(asset.view)
        if not additions:
            return bundle
        return ReferenceBundle(tuple(live_assets + additions))

    def resolve_location_refs_with_overlay(paths_or_root, location_id: str):
        refs = original_location_refs(paths_or_root, location_id)
        if refs:
            return refs
        slug = slugify_asset_id(str(location_id))
        return _existing_fixture_refs(location_files.get(slug, {}))

    dispatch_payload.resolve_character_bundle = resolve_character_bundle_with_overlay
    dispatch_payload.resolve_location_refs = resolve_location_refs_with_overlay

    def restore() -> None:
        dispatch_payload.resolve_character_bundle = original_character_bundle
        dispatch_payload.resolve_location_refs = original_location_refs

    return restore


def _print_header() -> None:
    print(f"{'STAT':<5}| {'MODALITY':<14}| {'MODEL':<32}| {'MODIFIER':<14}| REASON")
    print("-" * 100)


def _print_footer(report: AuditReport) -> None:
    print("-" * 100)
    print(f"RESULT: {report.passed}/{report.total} PASS, {report.failed} FAIL")


# ---------------------------------------------------------------------------
# R5 SYNTHESIS §1.8 — production-sidecar inspection mode
# ---------------------------------------------------------------------------


def _inspect_sidecars_main(args) -> int:
    """R5 SYNTHESIS §1.8—production-sidecar inspection mode.

    Reads recent .mp4.json sidecars from
    projects/<project>/renders/<episode>/ (sorted by mtime DESC)
    and runs the full assertion set (#1-#26) against each. Per-file
    failures are reported as WARNING (not fatal—the video is paid
    for; we surface the regression for the next build).

    Production sidecars have a slightly different shape than synthetic
    payloads—we bridge via _production_sidecar_to_audit_payload()
    which mirrors provenance keys into the payload structure the
    assertions expect.
    """
    from pathlib import Path
    import json as _json

    try:
        from recoil.pipeline.tools.audit_assertions import run_all_assertions  # type: ignore
    except Exception as e:
        print(
            f"INFRA-ERROR importing assertions: {type(e).__name__}: {e}",
            file=sys.stderr,
        )
        return 2

    repo = Path(__file__).resolve().parents[3]
    project = args.project
    episode = args.episode
    output_dir = repo / "projects" / project / "output" / "video" / episode
    receipts_arg = Path(args.receipts_log)
    receipts_log = receipts_arg if receipts_arg.is_absolute() else (repo / receipts_arg)

    if not output_dir.exists():
        print(f"INFRA-WARNING: {output_dir} does not exist", file=sys.stderr)
        return 0

    if args.sidecar_path:
        candidates = [Path(p) for p in args.sidecar_path]
    else:
        all_sidecars = sorted(
            output_dir.glob("*.mp4.json"),
            key=lambda p: p.stat().st_mtime,
            reverse=True,
        )
        candidates = all_sidecars[: max(1, args.recent)]

    print(f"--inspect-sidecars: {len(candidates)} sidecar(s) under inspection")
    print("-" * 100)

    n_pass = 0
    n_warn = 0
    for sc_path in candidates:
        try:
            sc = _json.loads(sc_path.read_text())
        except Exception as exc:
            print(f"WARN  {sc_path.name}  unreadable: {exc}", file=sys.stderr)
            n_warn += 1
            continue
        payload, modality, model, shot = _production_sidecar_to_audit_payload(
            sc,
            sc_path,
            output_dir,
            receipts_log,
        )
        try:
            run_all_assertions(
                payload=payload,
                modality=modality,
                model=model,
                kwargs={"shot": shot},
            )
            print(f"OK    {sc_path.name}  {modality}|{model}")
            n_pass += 1
        except AssertionError as exc:
            print(
                f"WARN  {sc_path.name}  {modality}|{model}  {exc}",
                file=sys.stderr,
            )
            n_warn += 1
        except Exception as exc:
            # Defensive—an unexpected assertion-internal exception (e.g.
            # KeyError from a shape mismatch the bridge didn't cover) should
            # surface as WARN, never crash the inspection loop.
            print(
                f"WARN  {sc_path.name}  {modality}|{model}  "
                f"{type(exc).__name__}: {exc}",
                file=sys.stderr,
            )
            n_warn += 1

    print("-" * 100)
    print(
        f"--inspect-sidecars summary: {n_pass} pass / {n_warn} warn / "
        f"{len(candidates)} total"
    )
    # Non-fatal—warnings don't break the build; they surface for the
    # next round. Return 0 so dispatch_cli post-fire hook doesn't fail
    # JT's live dispatch.
    return 0


def _production_sidecar_to_audit_payload(
    sidecar,
    sidecar_path: "Path",
    output_dir: "Path",
    receipts_log: "Path",
):
    """Bridge a production sidecar's shape to the audit payload shape the
    assertions consume.

    Production shape (provenance.* nested):
        {schema_version, source, status, created_at, updated_at,
         provenance: {model, prompt, refs_used, cost, gate_results,
                      generation_params, shot_id, dispatch_path,
                      prompt_engine_version, provider_adapter},
         lineage, notes, tags}

    Synthetic shape (flat, what _build_audit_payload returns):
        {shot_id, prompt, model, modality, duration, aspect_ratio,
         generate_audio, provider_hints, inputs_snapshot, elements_payload,
         reference_images, start_frame, gate_results, prompt_layers,
         output_filename, ref_manifest}

    The bridge: copy provenance.* into the top level (assertions read
    payload.get("prompt"), payload.get("reference_images"), etc.) AND
    stamp _audit_output_dir + _audit_receipts_log so #25 + #26 fire.

    Sidecar inputs may be either a dict (production sidecars on disk) or
    a namespace-like object (synthetic test injection). We pull values
    via _get(...) which handles both — Gemini spec-review patch.
    """

    def _get(obj, key, default=None):
        """Read `key` from either a dict-shaped or object-shaped sidecar."""
        if obj is None:
            return default
        if isinstance(obj, dict):
            return obj.get(key, default)
        return getattr(obj, key, default)

    prov_raw = _get(sidecar, "provenance") or {}

    # Normalise provenance access via _get too (covers namespace-shaped
    # production fixtures that mock a sidecar via SimpleNamespace).
    def _pget(key, default=None):
        return _get(prov_raw, key, default)

    gen_params = _pget("generation_params") or {}
    payload: dict = {
        "prompt": _pget("prompt") or "",
        "model": _pget("model") or "unknown",
        "reference_images": _pget("refs_used") or [],
        "gate_results": _pget("gate_results") or {},
        "prompt_layers": _pget("prompt_layers") or {},
        "generate_audio": _get(gen_params, "generate_audio", True),
        # Production records the resolved generate_audio value—the audit's
        # #10 assertion treats this as an explicit override (skip the
        # narrative-default check).
        "explicit_generate_audio_override": True,
        # start_frame may be None on production sidecars—assertion #5 is
        # tolerant of None.
        "start_frame": _pget("start_frame"),
        # Strip the ".json" suffix so assertion #7 sees the canonical
        # "EP001_SH10_take16.mp4" filename.
        "output_filename": sidecar_path.stem,
        # Audit-path enrichment used by assertions #25 + #26.
        "_audit_output_dir": str(output_dir),
        "_audit_receipts_log": str(receipts_log),
        # Audit-path sidecar dict used by assertion #18. Always pass a
        # dict-shaped view so #18's prov.get(...) calls work even if the
        # caller passed a SimpleNamespace.
        "_audit_sidecar_dict": (
            sidecar
            if isinstance(sidecar, dict)
            else {"provenance": dict(prov_raw) if isinstance(prov_raw, dict) else {}}
        ),
    }
    # Derive modality from generation_params.mode (image2video → video_i2v;
    # text2video → video_t2v; etc.) or fall back to video_i2v. r2v_multi is
    # detected via dispatch_path provenance.
    mode = str(_get(gen_params, "mode") or "").lower()
    dispatch_path = str(_pget("dispatch_path") or "")
    if "r2v_multi" in dispatch_path or "r2v_multi" in mode:
        modality = "r2v_multi"
    else:
        modality = "video_i2v"
    model = _pget("model") or "unknown"
    # The synthetic-fixture shot dict is what assertions read for character
    # rosters. Production sidecars don't carry the shot dict; reconstruct a
    # minimal one from provenance keys so the assertions that need it
    # (#4 proper-noun strip, #23 multi-char) have something to consume.
    shot = {
        "shot_id": _pget("shot_id") or sidecar_path.stem,
        "asset_data": {
            "characters": [],
            # Production sidecars don't enumerate refs by char_id; #23 will
            # soft-pass when characters is empty (n_chars < 2 branch).
            "location_id": None,
        },
        "audio_data": {"dialogue": []},
        "routing_data": {"has_dialogue": False},
        "has_dialogue": False,
    }
    return payload, modality, model, shot


def _load_builders_keys() -> list[tuple[str, str]]:
    """Snapshot BUILDERS.keys() as a list of (model, modality) tuples.

    Done as a function so the import error path can be caught at runtime
    and translated to an exit code 2.
    """
    from recoil.pipeline._lib.prompt_engine import BUILDERS  # type: ignore

    return list(BUILDERS.keys())


def main(argv: Optional[list[str]] = None) -> int:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "--self-check",
        action="store_true",
        help="Run the synthetic fixture audit with the standard local project/episode.",
    )
    parser.add_argument("--project", help="Project slug, e.g. tartarus")
    parser.add_argument("--episode", help="Episode token, e.g. ep_001")
    parser.add_argument(
        "--fixture-plan",
        default="recoil/pipeline/tools/audit_fixtures/audit_plan.json",
        help="Path to the synthetic plan JSON",
    )
    parser.add_argument(
        "--verbose", action="store_true", help="Print payload dict per path"
    )
    # R5 SYNTHESIS §1.8—inspect production sidecars after live fires.
    parser.add_argument(
        "--inspect-sidecars",
        action="store_true",
        help=(
            "Inspection mode—read recent production sidecars from "
            "projects/<project>/renders/<episode>/ and run the full "
            "assertion set against each. Catches 'synthetic passed, prod "
            "broke' the moment it happens."
        ),
    )
    parser.add_argument(
        "--recent",
        type=int,
        default=20,
        help=(
            "When --inspect-sidecars is set, inspect the N most recent "
            "sidecars (by mtime). Default 20. Pass 1 for post-fire hook."
        ),
    )
    parser.add_argument(
        "--sidecar-path",
        action="append",
        default=None,
        help=(
            "Explicit sidecar path to inspect (repeat for multiple). "
            "Overrides --recent. Used by dispatch_cli post-fire hook."
        ),
    )
    parser.add_argument(
        "--receipts-log",
        default="recoil/_dispatch_logs/receipts.jsonl",
        help="Receipts log path (used by assertions #25 + #26).",
    )
    args = parser.parse_args(argv)

    if args.self_check:
        args.project = args.project or "audit_fixture"
        args.episode = args.episode or "ep_001"
        _prepare_self_check_project(args.project, args.episode, args.fixture_plan)
    elif not args.project or not args.episode:
        parser.error("the following arguments are required: --project, --episode")

    if args.inspect_sidecars:
        return _inspect_sidecars_main(args)

    try:
        plan = load_fixture(Path(args.fixture_plan))
    except Exception as e:
        print(f"INFRA-ERROR loading fixture: {type(e).__name__}: {e}", file=sys.stderr)
        return 2

    # Synthetic fixture PNGs are gitignored (media-evict policy), so a fresh
    # clone has none — generate any missing ones before installing refs, else
    # ref-resolution returns empty and ~75 paths fail (REC-31). Idempotent.
    _ensure_synthetic_fixture_pngs(plan)

    # Install v2-shaped ref symlinks into the project's assets/ dir so the
    # ref_resolver (post project-paths-refactor-v2) can find them. Idempotent.
    _install_v2_fixture_refs(args.project)
    restore_fixture_ref_overlay = _install_fixture_ref_overlay()

    try:
        try:
            builders_keys = _load_builders_keys()
        except Exception as e:
            print(
                f"INFRA-ERROR importing BUILDERS: {type(e).__name__}: {e}",
                file=sys.stderr,
            )
            traceback.print_exc(file=sys.stderr)
            return 2

        try:
            from recoil.pipeline.tools.audit_assertions import run_all_assertions  # type: ignore
        except Exception as e:
            print(
                f"INFRA-ERROR importing assertions: {type(e).__name__}: {e}",
                file=sys.stderr,
            )
            return 2

        report = AuditReport()
        _print_header()

        for modality, model, modifier, kwargs in enumerate_paths(plan, builders_keys):
            result = run_path(
                modality=modality,
                model=model,
                modifier=modifier,
                kwargs=kwargs,
                project=args.project,
                episode=args.episode,
                run_all_assertions=run_all_assertions,
            )
            print(result.row())
            if args.verbose and not result.passed:
                print(f"        kwargs={kwargs}")
            report.results.append(result)

        _print_footer(report)

        if report.total == 0:
            print(
                "INFRA-WARNING: enumerated 0 paths — check fixture plan + BUILDERS",
                file=sys.stderr,
            )
            return 2

        return 0 if report.all_passed else 1
    finally:
        restore_fixture_ref_overlay()


if __name__ == "__main__":
    raise SystemExit(main())
