from __future__ import annotations

import json
from pathlib import Path

from PIL import Image
import pytest

from recoil.core.paths import ProjectPaths
from recoil.core.ref_types import RefAsset, ReferenceBundle
from recoil.pipeline._lib import board_builder as bb
from recoil.pipeline._lib import derivation_manifest
from recoil.pipeline._lib import dispatch_payload as dp
from recoil.pipeline._lib.derivation_sha import shotset_hash
from recoil.pipeline._lib.plan_loader import CanonicalShot, CharacterEntry
from recoil.pipeline._lib import story_gate as sg
from recoil.pipeline.core.persistence import (
    SceneVersionConflictError,
    load_scene,
    save_scene,
    scene_path,
)
from recoil.pipeline.core.receipts import GenerationReceipt
from recoil.pipeline.core.registry import MODALITY_STORYBOARD, RunResult
from recoil.pipeline.core.scene_version_store import SceneVersionStore
from recoil.pipeline.core.take import Beat, Scene


def _shot(n: int, *, sublocation: str | None = None, location_id: str | None = None) -> dict:
    shot_id = f"EP001_SH{n:02d}"
    shot = {
        "shot_id": shot_id,
        "scene_index": 1,
        "duration_s": 1.0,
        "intent": f"Beat {n} action.",
        "asset_data": {
            "characters": [],
            "location_id": location_id,
        },
        "spatial_data": {},
    }
    if sublocation:
        shot["spatial_data"]["sublocation"] = sublocation
    return shot


@pytest.fixture()
def project_paths(tmp_path, monkeypatch):
    paths = ProjectPaths(project_root=tmp_path / "fixture_project")
    paths.project_root.mkdir(parents=True)
    monkeypatch.setattr(
        ProjectPaths,
        "for_project",
        classmethod(lambda cls, project=None: paths),
    )
    return paths


def _write_batch_scene(project_paths: ProjectPaths, shots: list[dict], *, board=None) -> Path:
    beat = Beat(
        beat_id="BATCH_004",
        beat_metadata={
            "scene_id": "BATCH_004",
            "modality": "r2v_multi",
            "shot": shots[0],
            "batch_shots": shots,
            "batch_summary": {
                "shared_characters": [],
                "shared_location_id": shots[0].get("location_id"),
            },
        },
        board=board,
    )
    scene = Scene(
        scene_id="BATCH_004",
        beats=[beat],
        scene_metadata={"episode": "ep_001", "project": "fixture_project"},
    )
    path = scene_path("fixture_project", "ep_001", "BATCH_004")
    save_scene(scene, path)
    return path


def _settings_passthrough(segments, **_kwargs):
    return [dict(seg, setting=f"Setting {i}") for i, seg in enumerate(segments, start=1)]


def _receipt(success: bool, *, error: str | None = None) -> GenerationReceipt:
    return GenerationReceipt(
        receipt_id="rcpt_test",
        modality=MODALITY_STORYBOARD,
        caller_id="board_builder",
        project="fixture_project",
        episode=1,
        shot_id="EP001_CONT_004",
        timestamp_utc="2026-06-11T00:00:00Z",
        run_result=RunResult(
            id="run_test",
            modality=MODALITY_STORYBOARD,
            output_path="/tmp/board.png" if success else None,
            metadata={},
            success=success,
            error=error,
        ),
    )


def _dispatch_writing_board(*, omit_sidecar_keys: set[str] | None = None):
    omit_sidecar_keys = omit_sidecar_keys or set()

    def fake_dispatch(modality, payload, *, context):
        storyboards_dir = Path(payload["save_dir"])
        storyboards_dir.mkdir(parents=True, exist_ok=True)
        png_path = storyboards_dir / f"{payload['filename_stem']}.png"
        Image.new("RGB", (20, 20), color=(32, 96, 160)).save(png_path)
        sidecar = dict(payload["sidecar_extra"])
        for key in omit_sidecar_keys:
            sidecar.pop(key, None)
        Path(f"{png_path}.json").write_text(
            json.dumps(sidecar),
            encoding="utf-8",
        )
        return _receipt(True)

    return fake_dispatch


def _resolver_grouping(shot_ids: list[str], *, include_hash: bool = True) -> dict:
    grouping = {
        "strategy": "continuity",
        "ordinal": 4,
        "shot_ids": list(shot_ids),
        "source_pass_id": None,
    }
    if include_hash:
        grouping["shotset_hash"] = shotset_hash(shot_ids)
    return grouping


def _resolver_board(
    *,
    status: str = "approved",
    source_sha256: str = "sha256:cached",
    artifact: str = "prep/ep_001/storyboards/EP001_CONT_004_v01.png",
) -> dict:
    return {
        "status": status,
        "artifact": artifact,
        "source_sha256": source_sha256,
        "approved_by": "director",
        "updated_at": "2026-06-14T00:00:00Z",
        "fingerprint_version": 2,
    }


def _resolver_beat(
    *,
    shots: list[dict] | None = None,
    board: dict | None = None,
    grouping: dict | None = None,
) -> Beat:
    shots = shots or [_shot(10)]
    metadata = {
        "scene_id": "BATCH_004",
        "modality": "r2v_multi",
        "shot": shots[0],
        "batch_shots": shots,
    }
    if grouping is not None:
        metadata["grouping"] = grouping
    return Beat(
        beat_id="EP001_CONT_004",
        beat_metadata=metadata,
        board=board,
    )


def _resolver_source_sha(project: str, beat: Beat, *, version: int = 2) -> str:
    primitive = bb._primitive_from_beat(project, beat.beat_id, beat)
    segments = list(getattr(primitive, "timing_segments", []) or [])
    return bb.compute_source_sha256(segments, version=version)


def _resolver_approved_record(project: str, beat: Beat, **overrides) -> dict:
    record = {
        "status": "approved",
        "artifact": "prep/ep_001/storyboards/EP001_CONT_004_v01.png",
        "photoreal_artifact": "prep/ep_001/storyboards/EP001_CONT_004_v01_photoreal.png",
        "source_sha256": _resolver_source_sha(project, beat, version=2),
        "fingerprint_version": 2,
        "covered_shot_ids": list(
            ((beat.beat_metadata or {}).get("grouping") or {}).get("shot_ids") or []
        ),
        "approved_by": "director",
        "updated_at": "2026-06-14T00:00:00Z",
    }
    record.update(overrides)
    return record


def _stamp_resolver_record(project: str, beat: Beat, record: dict) -> str:
    grouping = (beat.beat_metadata or {}).get("grouping") or {}
    h = grouping.get("shotset_hash") or shotset_hash(grouping["shot_ids"])
    derivation_manifest.stamp_board(project, 1, h, record)
    return h


def _judge_call_ok(prompt, images, *, model=None, max_attempts=3):
    if "TEXT-STAGEABILITY" in prompt:
        return {"stageable": True, "findings": []}
    return {
        "text_stageability": None,
        "panels": [
            {
                "index": 1,
                "description": "Panel 1 visible.",
                "forced_checks": {
                    name: {
                        "passed": True,
                        "severity": "SOFT",
                        "confidence": 0.9,
                        "reason": "visible",
                    }
                    for name in (
                        "depicts_beat",
                        "spatially_possible",
                        "eyeline_consistent",
                        "object_of_gaze_in_frame_and_front",
                        "causal_setup_present",
                    )
                },
                "fix_hint": "",
                "fix_hint_injectable": True,
            },
            {
                "index": 2,
                "description": "Panel 2 visible.",
                "forced_checks": {
                    name: {
                        "passed": True,
                        "severity": "SOFT",
                        "confidence": 0.9,
                        "reason": "visible",
                    }
                    for name in (
                        "depicts_beat",
                        "spatially_possible",
                        "eyeline_consistent",
                        "object_of_gaze_in_frame_and_front",
                        "causal_setup_present",
                    )
                },
                "fix_hint": "",
                "fix_hint_injectable": True,
            },
        ],
        "transitions": [
            {
                "from": 1,
                "to": 2,
                "forced_checks": {
                    "causal_setup_present": {
                        "passed": True,
                        "severity": "SOFT",
                        "confidence": 0.9,
                        "reason": "visible",
                    }
                },
                "fix_hint": "",
                "fix_hint_injectable": True,
            }
        ],
        "routing": {"class": "ok", "confidence": 0.9, "evidence": "coherent"},
    }


def test_panel_count_one_uses_2x2_grid_with_blank_cells(project_paths, monkeypatch):
    _write_batch_scene(project_paths, [_shot(10)])
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
        dry_run=True,
    )

    assert result["size_override"] == "864x1536"  # iteration tier default half (REC-149)
    assert "grid of 2 columns x 2 rows" in result["prompt"]
    assert "leave the trailing 3 cells completely blank" in result["prompt"]
    assert len(result["panels"]) == 1


def test_panel_count_seven_raises(project_paths, monkeypatch):
    _write_batch_scene(project_paths, [_shot(i) for i in range(10, 17)])
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)

    with pytest.raises(bb.BoardBuilderError, match="batch has 7 segments"):
        bb.build_and_dispatch_board(
            "fixture_project",
            1,
            "EP001_CONT_004",
            step_runner=object(),
            dry_run=True,
        )


def test_size_override_table_including_five_panel_bound(project_paths, monkeypatch):
    _write_batch_scene(project_paths, [_shot(i) for i in range(10, 15)])
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
        dry_run=True,
    )

    # 5 panels -> 2x3 grid (one blank cell)
    assert result["size_override"] == "720x1920"  # iteration tier default half (REC-149)
    assert "grid of 2 columns x 3 rows" in result["prompt"]
    width, height = [int(part) for part in result["size_override"].split("x")]
    assert width % 16 == 0
    assert height % 16 == 0
    assert max(width / height, height / width) <= 3
    assert width <= 3840
    assert height <= 3840
    assert width * height <= 8_294_400


def test_source_sha256_v1_golden_default_and_v2_drops_intent():
    base = [
        {
            "shot_id": "EP001_SH10",
            "start_s": 0,
            "end_s": 1,
            "duration_s": 1,
            "intent": "A",
            "sublocation": "pod_platform",
            "setting": "first",
        }
    ]
    changed_setting = [dict(base[0], setting="second")]
    changed_intent = [dict(base[0], intent="B")]

    assert (
        bb.compute_source_sha256(base, version=1)
        == "f1da0c4a88ca78e705dcfe64ba7f46558cb6460e2281cbe56a67922de8b53922"
    )
    assert bb.compute_source_sha256(base) == bb.compute_source_sha256(base, version=1)
    assert bb.compute_source_sha256(base, version=1) == bb.compute_source_sha256(
        changed_setting,
        version=1,
    )
    assert bb.compute_source_sha256(base, version=1) != bb.compute_source_sha256(
        changed_intent,
        version=1,
    )
    assert bb.compute_source_sha256(base, version=2) == bb.compute_source_sha256(
        changed_intent,
        version=2,
    )
    assert bb.compute_source_sha256(base, version=2) != bb.compute_source_sha256(
        base,
        version=1,
    )


def test_source_sha256_v2_tracks_structural_keys():
    base = [
        {
            "shot_id": "EP001_SH10",
            "start_s": 0,
            "end_s": 1,
            "duration_s": 1,
            "intent": "A",
            "sublocation": "pod_platform",
        }
    ]
    base_sha = bb.compute_source_sha256(base, version=2)

    for key, value in (
        ("shot_id", "EP001_SH11"),
        ("start_s", 0.25),
        ("end_s", 1.25),
        ("duration_s", 1.25),
        ("sublocation", "airlock"),
    ):
        changed = [dict(base[0], **{key: value})]
        assert bb.compute_source_sha256(changed, version=2) != base_sha


@pytest.mark.parametrize("version", [3, True, 1.0, 2.0])
def test_source_sha256_rejects_unknown_or_non_strict_int_version(version):
    with pytest.raises(ValueError, match="unknown board fingerprint version"):
        bb.compute_source_sha256([], version=version)


@pytest.mark.parametrize(
    "record_overrides",
    [
        {"status": "rejected"},
        {"needs_revalidation": True},
    ],
)
def test_resolve_board_for_spend_ssot_governs_cache_cannot_smuggle(
    project_paths,
    record_overrides,
):
    project = project_paths.project
    beat = _resolver_beat(
        board=_resolver_board(),
        grouping=_resolver_grouping(["EP001_SH10"]),
    )
    record = _resolver_approved_record(project, beat, **record_overrides)
    _stamp_resolver_record(project, beat, record)

    approved, board = bb.resolve_board_for_spend(project, 1, beat)

    assert approved is False
    assert board == beat.board


def test_resolve_board_for_spend_carries_approved_record_from_ssot_empty_cache(
    project_paths,
):
    project = project_paths.project
    beat = _resolver_beat(
        board=None,
        grouping=_resolver_grouping(["EP001_SH10"], include_hash=False),
    )
    record = _resolver_approved_record(project, beat)
    _stamp_resolver_record(project, beat, record)

    approved, board = bb.resolve_board_for_spend(project, 1, beat)

    assert approved is True
    assert board == bb.board_record_to_cache(record)
    assert board["artifact"] == record["artifact"]
    assert board["photoreal_artifact"] == record["photoreal_artifact"]
    Beat(beat_id="validated", board=bb.board_record_to_cache(record))


def test_resolve_board_for_spend_derivable_hash_missing_record_blocks_cache_smuggle(
    project_paths,
):
    project = project_paths.project
    beat = _resolver_beat(
        board=_resolver_board(),
        grouping=_resolver_grouping(["EP001_SH10"]),
    )

    approved, board = bb.resolve_board_for_spend(project, 1, beat)

    assert approved is False
    assert board == beat.board


@pytest.mark.parametrize(
    ("cache", "expected"),
    [
        (_resolver_board(status="approved"), True),
        (_resolver_board(status="rejected"), False),
        (None, False),
    ],
)
def test_resolve_board_for_spend_no_grouping_uses_legacy_cache(project_paths, cache, expected):
    beat = _resolver_beat(board=cache, grouping=None)

    approved, board = bb.resolve_board_for_spend(project_paths.project, 1, beat)

    assert approved is expected
    assert board == cache


def test_resolve_board_for_spend_freshness_blocks_structurally_stale_approved_record(
    project_paths,
):
    project = project_paths.project
    original = _resolver_beat(
        board=None,
        grouping=_resolver_grouping(["EP001_SH10"]),
    )
    record = _resolver_approved_record(project, original)

    changed_shot = _shot(10)
    changed_shot["duration_s"] = 2.0
    changed = _resolver_beat(
        shots=[changed_shot],
        board=_resolver_board(source_sha256=record["source_sha256"]),
        grouping=_resolver_grouping(["EP001_SH10"]),
    )
    _stamp_resolver_record(project, changed, record)

    approved, board = bb.resolve_board_for_spend(project, 1, changed)

    assert approved is False
    assert board == changed.board

    matching = _resolver_beat(
        board=None,
        grouping=_resolver_grouping(["EP001_SH10"]),
    )
    approved, board = bb.resolve_board_for_spend(project, 1, matching)

    assert approved is True
    assert board == bb.board_record_to_cache(record)


def test_resolve_board_for_spend_blocks_board_approved_against_other_version(
    project_paths,
):
    """REC-231: boards[shotset_hash] is shared by sibling versions (same shot-id set →
    same key, same structure-only freshness sha) that differ ONLY in raw/description.
    A board approved against a DIFFERENT active version must NOT bless a paid render of
    this version's content (the revert-to-a-differently-prompted-sibling spend bug). The
    batch's active version here is 1 (no versions manifest): scene_version=2 BLOCKS,
    scene_version=1 (matching) approves."""
    project = project_paths.project
    beat = _resolver_beat(board=None, grouping=_resolver_grouping(["EP001_SH10"]))

    # Fresh, approved, but stamped against version 2 while active==1 → BLOCK.
    rec_v2 = _resolver_approved_record(project, beat, scene_version=2)
    _stamp_resolver_record(project, beat, rec_v2)
    approved, board = bb.resolve_board_for_spend(project, 1, beat)
    assert approved is False
    assert board == beat.board

    # Same record stamped against the ACTIVE version (1) → APPROVED.
    rec_v1 = _resolver_approved_record(project, beat, scene_version=1)
    _stamp_resolver_record(project, beat, rec_v1)
    approved, board = bb.resolve_board_for_spend(project, 1, beat)
    assert approved is True
    assert board == bb.board_record_to_cache(rec_v1)


def test_resolve_board_for_spend_fail_closed_when_freshness_uncomputable(project_paths):
    project = project_paths.project
    approved_source = _resolver_beat(
        board=None,
        grouping=_resolver_grouping(["EP001_SH10"]),
    )
    record = _resolver_approved_record(project, approved_source)
    broken = Beat(
        beat_id="EP001_CONT_004",
        beat_metadata={
            "scene_id": "BATCH_004",
            "modality": "r2v_multi",
            "grouping": _resolver_grouping(["EP001_SH10"]),
        },
        board=_resolver_board(source_sha256=record["source_sha256"]),
    )
    _stamp_resolver_record(project, broken, record)

    approved, board = bb.resolve_board_for_spend(project, 1, broken)

    assert approved is False
    assert board == broken.board


def test_next_board_version_scans_disk(tmp_path):
    assert bb.next_board_version(tmp_path, "EP001_CONT_004") == 1
    (tmp_path / "EP001_CONT_004_v01.png").write_bytes(b"one")
    (tmp_path / "EP001_CONT_004_v03.png").write_bytes(b"three")
    (tmp_path / "EP001_CONT_004_notes.txt").write_text("ignore")

    assert bb.next_board_version(tmp_path, "EP001_CONT_004") == 4


def test_dispatch_success_payload_sidecar_fingerprint_and_board_persisted(
    project_paths,
    monkeypatch,
):
    shots = [
        _shot(10, sublocation="pod_platform", location_id="int_lab"),
        _shot(11, sublocation="pod_platform", location_id="int_lab"),
    ]
    shots[0]["asset_data"]["characters"] = [{"char_id": "JADE", "wardrobe_phase_id": "p1"}]
    shots[1]["asset_data"]["characters"] = [{"char_id": "JADE", "wardrobe_phase_id": "p1"}]
    scene_file = _write_batch_scene(project_paths, shots)
    project_paths.global_bible_path.parent.mkdir(parents=True)
    project_paths.global_bible_path.write_text(
        '{"characters":{"JADE":{"visual_description":"Lean salvager. Second sentence."}}}',
        encoding="utf-8",
    )
    char_dir = project_paths.project_root / "assets" / "char" / "jade" / "base"
    char_dir.mkdir(parents=True)
    front = char_dir / "jade_front_base_v01.png"
    profile = char_dir / "jade_profile_base_v01.png"
    front.write_bytes(b"front")
    profile.write_bytes(b"profile")
    subloc = (
        project_paths.asset_look_dir("loc", "int_lab", "base")
        / "sublocations"
        / "sublocation_pod_platform_v01.png"
    )
    subloc.parent.mkdir(parents=True)
    subloc.write_bytes(b"fake-image")

    captured = {}
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(
        bb,
        "resolve_character_bundle",
        lambda paths, char_id, phase=None: ReferenceBundle((
            RefAsset(path=front, role="identity", subject=char_id, kind="turn", view="front"),
            RefAsset(path=profile, role="identity", subject=char_id, kind="turn", view="profile"),
        )),
    )
    monkeypatch.setattr(bb, "sublocation_ref", lambda paths, location_id, name: subloc)
    monkeypatch.setattr(bb, "validate_ref_file", lambda path: None)

    def fake_dispatch(modality, payload, *, context):
        captured["modality"] = modality
        captured["payload"] = payload
        captured["context"] = context
        return _receipt(True)

    monkeypatch.setattr(bb, "dispatch", fake_dispatch)

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
    )

    assert result["success"] is True
    assert result["artifact"] == "prep/ep_001/storyboards/EP001_CONT_004_v01.png"
    payload = captured["payload"]
    assert captured["modality"] == MODALITY_STORYBOARD
    assert payload["size_override"] == "864x1536"  # iteration tier default half (REC-149)
    assert payload["filename_stem"] == "EP001_CONT_004_v01"
    sidecar = payload["sidecar_extra"]
    assert sidecar["kind"] == "storyboard"
    assert sidecar["batch_id"] == "EP001_CONT_004"
    assert sidecar["panels"] == [
        {"segment_id": "EP001_SH10", "setting": "Setting 1"},
        {"segment_id": "EP001_SH11", "setting": "Setting 2"},
    ]
    assert sidecar["sublocation"] == "pod_platform"
    assert sidecar["identity_refs"] == [
        "assets/char/jade/base/jade_front_base_v01.png",
        "assets/char/jade/base/jade_profile_base_v01.png",
    ]
    assert sidecar["sublocation_ref"] == "sublocations/sublocation_pod_platform_v01.png"
    assert sidecar["model"] == "gpt-image-2"
    assert sidecar["strategy"] == "director_arm_a"
    assert sidecar["fingerprint_version"] == bb.BOARD_FINGERPRINT_VERSION
    assert sidecar["version"] == 1
    assert "Lean salvager." in sidecar["prompt"]

    persisted = load_scene(scene_file)
    board = persisted.beats[0].board
    assert board is not None
    assert board["status"] == "proposed"
    assert board["artifact"] == result["artifact"]
    assert board["source_sha256"] == sidecar["source_sha256"]
    assert board["fingerprint_version"] == bb.BOARD_FINGERPRINT_VERSION
    assert result["fingerprint_version"] == bb.BOARD_FINGERPRINT_VERSION
    assert "story_gate" not in result
    assert "story_gate" not in board
    assert not (project_paths.episode_storyboards_dir(1) / "EP001_CONT_004_v01.verdict.json").exists()


def test_story_gate_shadow_writes_verdict_projection_and_still_proposes(
    project_paths,
    monkeypatch,
):
    scene_file = _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    monkeypatch.setenv("RECOIL_STORY_GATE", "shadow")
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(bb, "dispatch", _dispatch_writing_board())
    monkeypatch.setattr(sg, "_judge_call", _judge_call_ok)

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
    )

    verdict_path = project_paths.episode_storyboards_dir(1) / "EP001_CONT_004_v01.verdict.json"
    verdict = json.loads(verdict_path.read_text(encoding="utf-8"))
    persisted = load_scene(scene_file)
    board = persisted.beats[0].board

    assert result["success"] is True
    assert result["story_gate"]["route"] == "ok"
    assert result["story_gate"]["verdict_path"] == "prep/ep_001/storyboards/EP001_CONT_004_v01.verdict.json"
    assert verdict["routing"]["class"] == "ok"
    assert verdict["text_stageability"] == {"stageable": True, "findings": []}
    assert board is not None
    assert board["status"] == "proposed"
    assert board["story_gate"]["route"] == "ok"
    assert board["story_gate"]["verdict_path"] == result["story_gate"]["verdict_path"]


def test_story_gate_shadow_judge_unavailable_still_proposes(project_paths, monkeypatch):
    scene_file = _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    monkeypatch.setenv("RECOIL_STORY_GATE", "shadow")
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(bb, "dispatch", _dispatch_writing_board())

    def unavailable(*args, **kwargs):
        raise sg.StoryGateJudgeUnavailable("rate limited")

    monkeypatch.setattr(sg, "_judge_call", unavailable)

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
    )

    verdict_path = project_paths.episode_storyboards_dir(1) / "EP001_CONT_004_v01.verdict.json"
    verdict = json.loads(verdict_path.read_text(encoding="utf-8"))
    board = load_scene(scene_file).beats[0].board

    assert result["success"] is True
    assert result["story_gate"]["route"] == "judge_unavailable"
    assert verdict["routing"]["class"] == "judge_unavailable"
    assert board is not None
    assert board["status"] == "proposed"
    assert board["story_gate"]["route"] == "judge_unavailable"


def test_story_gate_shadow_preserves_text_unavailable_when_image_judge_succeeds(
    project_paths,
    monkeypatch,
):
    _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    monkeypatch.setenv("RECOIL_STORY_GATE", "shadow")
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(bb, "dispatch", _dispatch_writing_board())
    calls = []

    def text_unavailable_then_ok(prompt, images, *, model=None, max_attempts=3):
        calls.append(prompt)
        if "TEXT-STAGEABILITY" in prompt:
            raise sg.StoryGateJudgeUnavailable("text outage")
        return _judge_call_ok(prompt, images, model=model, max_attempts=max_attempts)

    monkeypatch.setattr(sg, "_judge_call", text_unavailable_then_ok)

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
    )

    verdict_path = project_paths.episode_storyboards_dir(1) / "EP001_CONT_004_v01.verdict.json"
    verdict = json.loads(verdict_path.read_text(encoding="utf-8"))

    assert result["success"] is True
    assert result["story_gate"]["route"] == "ok"
    assert verdict["routing"]["class"] == "ok"
    assert verdict["text_stageability"] == {
        "route": "judge_unavailable",
        "reason": "text outage",
    }


def test_story_gate_shadow_missing_prompt_sidecar_writes_unavailable_verdict(
    project_paths,
    monkeypatch,
):
    scene_file = _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    monkeypatch.setenv("RECOIL_STORY_GATE", "shadow")
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(bb, "dispatch", _dispatch_writing_board(omit_sidecar_keys={"prompt"}))
    monkeypatch.setattr(sg, "_judge_call", _judge_call_ok)

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
    )

    verdict_path = project_paths.episode_storyboards_dir(1) / "EP001_CONT_004_v01.verdict.json"
    verdict = json.loads(verdict_path.read_text(encoding="utf-8"))
    board = load_scene(scene_file).beats[0].board

    assert result["success"] is True
    assert result["story_gate"]["route"] == "judge_unavailable"
    assert verdict["routing"]["class"] == "judge_unavailable"
    assert "prompt" in verdict["routing"]["evidence"]
    assert board is not None
    assert board["status"] == "proposed"
    assert board["story_gate"]["route"] == "judge_unavailable"


def test_story_gate_shadow_unexpected_exception_writes_unavailable_verdict(
    project_paths,
    monkeypatch,
):
    scene_file = _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    monkeypatch.setenv("RECOIL_STORY_GATE", "shadow")
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(bb, "dispatch", _dispatch_writing_board())
    monkeypatch.setattr(sg, "_judge_call", _judge_call_ok)

    def boom(self, packet, *, use_crops=True, model=None):
        raise RuntimeError("judge bug")

    monkeypatch.setattr(sg.StoryGate, "evaluate_board", boom)

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
    )

    verdict_path = project_paths.episode_storyboards_dir(1) / "EP001_CONT_004_v01.verdict.json"
    verdict = json.loads(verdict_path.read_text(encoding="utf-8"))
    board = load_scene(scene_file).beats[0].board

    assert result["success"] is True
    assert result["story_gate"]["route"] == "judge_unavailable"
    assert verdict["routing"]["class"] == "judge_unavailable"
    assert "RuntimeError" in verdict["routing"]["evidence"]
    assert board is not None
    assert board["status"] == "proposed"
    assert board["story_gate"]["route"] == "judge_unavailable"


def test_story_gate_mode_off_writes_no_verdict_or_projection(project_paths, monkeypatch):
    scene_file = _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    monkeypatch.setenv("RECOIL_STORY_GATE", "off")
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(bb, "dispatch", _dispatch_writing_board())
    monkeypatch.setattr(
        sg,
        "_judge_call",
        lambda *args, **kwargs: pytest.fail("story gate judge must not run when off"),
    )

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
    )

    verdict_path = project_paths.episode_storyboards_dir(1) / "EP001_CONT_004_v01.verdict.json"
    board = load_scene(scene_file).beats[0].board

    assert result["success"] is True
    assert "story_gate" not in result
    assert not verdict_path.exists()
    assert board is not None
    assert "story_gate" not in board


def test_dispatch_failure_does_not_persist_board(project_paths, monkeypatch):
    scene_file = _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(bb, "dispatch", lambda *args, **kwargs: _receipt(False, error="boom"))

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
    )

    assert result["success"] is False
    assert result["error"] == "boom"
    assert load_scene(scene_file).beats[0].board is None


def test_active_version_move_after_load_rejects_before_storyboard_dispatch(project_paths, monkeypatch):
    scene_file = _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    store = SceneVersionStore("fixture_project", "ep_001")
    store.write_scene_candidate(
        "BATCH_004",
        Scene(
            scene_id="BATCH_004",
            beats=[load_scene(scene_file).beats[0]],
            scene_metadata={"episode": "ep_001", "project": "fixture_project"},
        ),
    )
    real_load = bb.load_scene_active_with_version

    def _load_then_move(*args, **kwargs):
        loaded = real_load(*args, **kwargs)
        store.conform("BATCH_004", 2)
        return loaded

    dispatch_calls = []
    monkeypatch.setattr(bb, "load_scene_active_with_version", _load_then_move)
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(
        bb,
        "dispatch",
        lambda *args, **kwargs: dispatch_calls.append(args) or _receipt(True),
    )

    with pytest.raises(SceneVersionConflictError):
        bb.build_and_dispatch_board(
            "fixture_project",
            1,
            "EP001_CONT_004",
            step_runner=object(),
        )

    assert dispatch_calls == []


def test_dry_run_zero_dispatch_and_zero_state_writes(project_paths, monkeypatch):
    scene_file = _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    before = scene_file.read_text(encoding="utf-8")
    monkeypatch.setenv("RECOIL_STORY_GATE", "shadow")
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(
        bb,
        "dispatch",
        lambda *args, **kwargs: pytest.fail("dry_run must not dispatch"),
    )
    monkeypatch.setattr(
        bb,
        "save_active_scene_status",
        lambda *args, **kwargs: pytest.fail("dry_run must not save scene"),
    )
    monkeypatch.setattr(
        sg,
        "_judge_call",
        lambda *args, **kwargs: pytest.fail("dry_run must not call story gate judge"),
    )

    result = bb.build_and_dispatch_board(
        "fixture_project",
        1,
        "EP001_CONT_004",
        step_runner=object(),
        dry_run=True,
    )

    assert result["estimated_cost_usd"] == 0.41
    assert "zero story-gate judge calls" in result["note"]
    assert scene_file.read_text(encoding="utf-8") == before


def test_story_gate_enforce_raises_before_dispatch(project_paths, monkeypatch):
    _write_batch_scene(project_paths, [_shot(10), _shot(11)])
    monkeypatch.setenv("RECOIL_STORY_GATE", "enforce")
    monkeypatch.setattr(bb, "derive_settings", _settings_passthrough)
    monkeypatch.setattr(
        bb,
        "dispatch",
        lambda *args, **kwargs: pytest.fail("enforce must fail before dispatch"),
    )

    with pytest.raises(NotImplementedError, match="story gate enforce ships in v1.1"):
        bb.build_and_dispatch_board(
            "fixture_project",
            1,
            "EP001_CONT_004",
            step_runner=object(),
        )


def test_mixed_sublocations_attach_first_ref_but_never_lock(project_paths, monkeypatch):
    """[pod_platform, None] must NOT count as a shared locked sublocation —
    the lock requires EVERY segment to carry the SAME sublocation."""
    subloc = (
        project_paths.asset_look_dir("loc", "int_lab", "base")
        / "sublocations"
        / "sublocation_pod_platform_v01.png"
    )
    subloc.parent.mkdir(parents=True)
    subloc.write_bytes(b"fake-image")
    monkeypatch.setattr(bb, "sublocation_ref", lambda paths, location_id, name: subloc)
    monkeypatch.setattr(bb, "validate_ref_file", lambda path: None)

    class _Primitive:
        location_id = "int_lab"

    segments = [
        {"shot_id": "EP001_SH10", "sublocation": "pod_platform"},
        {"shot_id": "EP001_SH11"},
    ]
    refs: list[str] = []
    ref_layout: dict = {}
    sidecar = bb._append_sublocation_ref(
        project_paths, _Primitive(), segments, refs, ref_layout
    )
    assert refs == [str(subloc)]
    assert ref_layout["sublocation_locked"] is False
    # REC-149: unlocked attachment is now RECORDED in sublocation_refs (was unrecorded)
    assert sidecar["shared_sublocation"] is None
    assert sidecar["shared_ref"] is None
    assert len(sidecar["sublocation_refs"]) == 1
    assert sidecar["sublocation_refs"][0]["slug"] == "pod_platform"
    assert sidecar["sublocation_refs"][0]["ref"].endswith("sublocation_pod_platform_v01.png")


def test_dry_run_is_zero_dispatch_with_production_prompt(project_paths, monkeypatch):
    """dry-run must make NO image dispatch and NO state writes, but DOES run
    derive_settings so the reviewed prompt matches the live run exactly."""
    shots = [
        _shot(10, sublocation="pod_platform", location_id="int_lab"),
        _shot(11, sublocation="pod_platform", location_id="int_lab"),
    ]
    scene_file = _write_batch_scene(project_paths, shots)  # noqa: F841

    def _boom(*a, **k):
        raise AssertionError("dispatch must not run in dry-run")

    settings_calls = []

    def _settings_spy(segments, **kwargs):
        settings_calls.append(1)
        return [dict(s, setting="Subloc: spied") for s in segments]

    monkeypatch.setattr(bb, "derive_settings", _settings_spy)
    monkeypatch.setattr(bb, "dispatch", _boom)
    monkeypatch.setattr(
        bb,
        "resolve_character_bundle",
        lambda paths, char_id, phase=None: ReferenceBundle(()),
    )
    monkeypatch.setattr(bb, "sublocation_ref", lambda *a, **k: None)
    monkeypatch.setattr(bb, "validate_ref_file", lambda path: None)
    monkeypatch.setattr(
        bb, "_append_sublocation_ref",
        lambda *a, **k: {"shared_sublocation": None, "shared_ref": None},
    )

    result = bb.build_and_dispatch_board(
        project_paths.project, 1, "EP001_CONT_004",
        step_runner=None, dry_run=True,
    )
    assert result["estimated_cost_usd"] > 0
    assert settings_calls, "derive_settings must run in dry-run (prompt parity)"
    assert all(p["setting"] == "Subloc: spied" for p in result["panels"])


def test_identity_refs_fall_back_to_base_when_phase_specific_absent(project_paths, monkeypatch):
    """A wardrobe phase on the beat must not strand ref resolution when only
    base (non-phase-suffixed) identity refs exist on disk — mirror the
    _collect_reference_images cascade."""
    calls = []

    def _fake_resolve(paths, char_id, phase=None):
        calls.append(phase)
        if phase:
            return ReferenceBundle(())
        return ReferenceBundle((
            RefAsset(path=Path("/refs/jade_front.png"), role="identity", subject=char_id, kind="turn", view="front"),
            RefAsset(path=Path("/refs/jade_profile.png"), role="identity", subject=char_id, kind="turn", view="profile"),
        ))

    monkeypatch.setattr(bb, "resolve_character_bundle", _fake_resolve)
    monkeypatch.setattr(bb, "_shared_characters", lambda beat, primitive: ["JADE"])
    monkeypatch.setattr(bb, "_phase_for_char", lambda beat, char_id: "p1")
    monkeypatch.setattr(
        bb, "_append_sublocation_ref",
        lambda *a, **k: {"shared_sublocation": None, "shared_ref": None},
    )
    monkeypatch.setattr(bb, "_project_relative", lambda paths, p: str(p))

    class _Primitive:
        location_id = None

    refs, ref_layout, identity_sidecar_refs, _ = bb._collect_board_refs(
        project_paths, _FakeBeat(), _Primitive(), []
    )
    assert refs == ["/refs/jade_front.png", "/refs/jade_profile.png"]
    assert calls == ["p1", None]


def test_board_r2v_share_char_bundle_projection(project_paths, monkeypatch):
    monkeypatch.delenv("RECOIL_USE_COMPOSITE_SHEETS", raising=False)
    monkeypatch.setitem(dp._project_config_cache, project_paths.project, {})
    monkeypatch.setattr(dp, "load_project_config", lambda project: {})
    monkeypatch.setattr(dp, "resolve_location_refs", lambda paths, location_id: {})

    hero = project_paths.project_root / "refs" / "jade_hero.png"
    front = project_paths.project_root / "refs" / "jade_front.png"
    profile = project_paths.project_root / "refs" / "jade_profile.png"
    back = project_paths.project_root / "refs" / "jade_back.png"

    def resolve_fixture(paths, subject, phase=None):
        subject_id = subject.lower()
        return ReferenceBundle((
            RefAsset(
                path=hero,
                role="identity",
                subject=subject_id,
                kind="identity",
                is_hero=True,
            ),
            RefAsset(
                path=front,
                role="identity",
                subject=subject_id,
                kind="turn",
                view="front",
            ),
            RefAsset(
                path=profile,
                role="identity",
                subject=subject_id,
                kind="turn",
                view="profile",
            ),
            RefAsset(
                path=back,
                role="identity",
                subject=subject_id,
                kind="turn",
                view="back",
            ),
        ))

    monkeypatch.setattr(bb, "resolve_character_bundle", resolve_fixture)
    monkeypatch.setattr(dp, "resolve_character_bundle", resolve_fixture)
    monkeypatch.setattr(bb, "_shared_characters", lambda beat, primitive: ["JADE"])
    monkeypatch.setattr(bb, "_phase_for_char", lambda beat, char_id: None)
    monkeypatch.setattr(
        bb,
        "_append_sublocation_ref",
        lambda *a, **k: {"shared_sublocation": None, "shared_ref": None},
    )
    monkeypatch.setattr(bb, "_project_relative", lambda paths, p: str(p))

    class _Primitive:
        location_id = None

    board_refs, _ref_layout, _identity_sidecar_refs, _sidecar = bb._collect_board_refs(
        project_paths, _FakeBeat(), _Primitive(), []
    )

    shot = CanonicalShot(
        shot_id="EP001_SH10",
        scene_index=1,
        sequence_id=None,
        pipeline="video",
        previs_model=None,
        video_model="seeddance-2.0",
        location_id=None,
        characters=[CharacterEntry(char_id="JADE")],
        shot_type="MS",
        duration_s=3.0,
        is_env_only=False,
        has_dialogue=False,
        aspect_ratio="9:16",
        raw={"shot_id": "EP001_SH10", "asset_data": {"characters": [{"char_id": "JADE"}]}},
    )
    r2v_refs, _manifest = dp._collect_reference_images(
        shot,
        project_paths.project,
        "r2v_multi",
        batch_shots=[shot],
    )

    assert board_refs == [str(front), str(profile)]
    assert str(hero) == r2v_refs[0]
    assert set(board_refs).issubset(set(r2v_refs))


def test_board_hero_failclosed_not_regressed(project_paths, monkeypatch):
    hero = project_paths.project_root / "refs" / "jade_hero.png"

    def hero_only(paths, subject, phase=None):
        return ReferenceBundle((
            RefAsset(
                path=hero,
                role="identity",
                subject=subject.lower(),
                kind="identity",
                is_hero=True,
            ),
        ))

    monkeypatch.setattr(bb, "resolve_character_bundle", hero_only)
    monkeypatch.setattr(bb, "_shared_characters", lambda beat, primitive: ["JADE"])
    monkeypatch.setattr(bb, "_phase_for_char", lambda beat, char_id: None)

    class _Primitive:
        location_id = None

    with pytest.raises(
        bb.BoardBuilderError,
        match="require front/hero and profile",
    ):
        bb._collect_board_refs(project_paths, _FakeBeat(), _Primitive(), [])


class _FakeBeat:
    beat_metadata: dict = {}


def test_primitive_from_beat_recovers_source_text_intent(project_paths):
    """Dict-shaped batch_shots (persisted scene JSON) must rebuild as
    CanonicalShot so segment intent carries raw.source_text, not the
    'shot intent' placeholder."""
    shot = _shot(10, sublocation="pod_platform", location_id="int_lab")
    shot.pop("intent", None)  # explicit intent (rightly) outranks source_text
    shot["raw"] = dict(shot.get("raw") or {})
    shot["raw"].pop("intent", None)
    shot["raw"]["source_text"] = "Jade hooks the rebreather strap, wry smirk."

    class _B:
        beat_metadata = {"batch_shots": [shot], "shot": shot}

    primitive = bb._primitive_from_beat(project_paths.project, "EP001_CONT_004", _B())
    seg = list(primitive.timing_segments)[0]
    assert "rebreather" in seg["intent"]


def test_scene_context_includes_causal_lead_in(project_paths):
    """The script span handed to the board model must include the paragraphs
    BEFORE the first matched beat — the cross-shot causality that per-segment
    slicing drops (the CONT_007 cable-grab lesson)."""
    episodes = project_paths.episodes_dir
    episodes.mkdir(parents=True, exist_ok=True)
    (episodes / "ep_001.md").write_text(
        "# [00:30 - 00:40] THE TURN\n\n"
        "Jade's boots slip. The cable sways.\n\n"
        "JADE\n(dry)\nOh great. It's alive.\n\n"
        "The chassis moves. One arm catches the cable. The other finds Jade's throat.\n\n"
        "Her feet leave the grating.\n",
        encoding="utf-8",
    )

    class _B:
        beat_metadata = {
            "batch_shots": [
                {"shot_id": "EP001_SH23",
                 "raw": {"source_text": "The chassis moves. One arm catches the cable. The other finds Jade's throat."}},
            ]
        }

    ctx = bb._scene_context_for_batch(project_paths, 1, _B())
    assert ctx is not None
    assert "boots slip" in ctx          # causal lead-in captured
    assert "catches the cable" in ctx   # the beat itself


def test_scene_context_fail_soft_when_script_missing(project_paths):
    class _B:
        beat_metadata = {"batch_shots": [
            {"shot_id": "X", "raw": {"source_text": "Nothing matches anything here."}}
        ]}

    assert bb._scene_context_for_batch(project_paths, 99, _B()) is None
