"""Tests for the REC-12 resume-time inputs_fingerprint revalidation pass.

The Phase-8 dispatch-time drift check in _build_workflow_for_beat is DEAD on
resume: run_scene filters out beats whose primary_take is already set, so a
succeeded beat is never re-dispatched and the stale take is silently reused
even after its resolved refs change (e.g. hero -> composite-sheet promotion).

revalidate_succeeded_fingerprints closes that gap: on resume it recomputes the
fingerprint from CURRENT resolved refs for every succeeded beat and demotes
(via _demote_take) any whose stored fingerprint no longer matches, so run_scene
re-dispatches them with the new refs.
"""
import dataclasses

import pytest
from unittest.mock import MagicMock, patch

from recoil.pipeline.core.take import Scene, Beat
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner
from recoil.pipeline.orchestrator.production_types import RetryCostPolicy
from recoil.pipeline._lib.plan_loader import CanonicalShot


def _shot_dict():
    """A full CanonicalShot, serialized the way production stores it in
    beat_metadata['shot'] (dataclasses.asdict) so the resume pass can
    reconstruct it via CanonicalShot(**shot_data)."""
    shot = CanonicalShot(
        shot_id="SH01",
        scene_index=1,
        sequence_id=None,
        pipeline="video",
        previs_model=None,
        video_model="seeddance-2.0",
        location_id="L1",
        characters=[],
        shot_type="MS",
        duration_s=5.0,
        is_env_only=True,
        has_dialogue=False,
        aspect_ratio="9:16",
        raw={"shot_id": "SH01"},
    )
    return dataclasses.asdict(shot)


# ── Autouse fixture — isolate from disk I/O ────────────────────────


@pytest.fixture(autouse=True)
def _isolate(monkeypatch):
    monkeypatch.setattr(
        "recoil.pipeline.orchestrator.episode_runner.init_scenes_from_plan",
        lambda *a, **kw: None,
    )
    monkeypatch.setattr(
        "recoil.pipeline.orchestrator.episode_runner.ops_log.write",
        lambda *a, **kw: None,
    )


# ── Helpers (mirror test_episode_runner_retry_strategy.py) ─────────


def _make_runner():
    return EpisodeRunner(
        project="test_proj",
        plan={"sequences": {"seq_01": {"shots": [{"shot_id": "SH01"}]}}},
        max_takes=3,
        budget_usd=50.0,
        step_runner=MagicMock(),
        retry_cost_policy=RetryCostPolicy(max_retry_spend_usd=6.0),
    )


def _make_beat(stored_fp: str):
    return Beat(
        beat_id="SH01",
        max_takes=3,
        beat_metadata={
            "scene_id": "seq_01",
            "shot": _shot_dict(),
            "modality": "video_i2v",
            "inputs_fingerprint": stored_fp,
        },
    )


def _make_scene(beat):
    return Scene(
        scene_id="seq_01", beats=[beat],
        scene_metadata={"episode": "ep_001", "project": "test_proj"},
    )


def _add_succeeded_primary_take(beat):
    """Attach one succeeded take and select it as primary."""
    wf = Workflow(
        workflow_id="SH01_wf0",
        steps=[WorkflowStep(step_id="video", modality="video_i2v",
                            payload={"shot_id": "SH01"}, status="succeeded")],
        global_provenance={"shot_id": "SH01"},
    )
    take = beat.new_take(workflow=wf)
    take.status = "succeeded"
    beat.primary_take_id = take.take_id
    return take


# Patch target: the new revalidation pass resolves refs via a dry-run
# build_dispatch_payload. We patch it at its source module so the patch holds
# regardless of how episode_runner imports the symbol.
_BDP = "recoil.pipeline._lib.dispatch_payload.build_dispatch_payload"


# ── Test 1: ref drift on a succeeded beat → demoted + re-dispatchable ──


def test_drifted_succeeded_beat_is_demoted_on_resume():
    """A succeeded beat whose resolved refs changed since success must be
    demoted so run_scene re-dispatches it (primary_take cleared)."""
    runner = _make_runner()

    # The fingerprint stored at success time was over the OLD refs (hero).
    stored_fp = EpisodeRunner._compute_inputs_fingerprint(["/refs/hero.png"])
    beat = _make_beat(stored_fp)
    _add_succeeded_primary_take(beat)
    scene = _make_scene(beat)

    assert beat.primary_take is not None  # precondition: looks "done"

    # The CURRENT resolved refs are now a composite sheet — drift.
    fresh_payload = {"reference_images": ["/refs/composite_sheet.png"]}
    with patch(_BDP, return_value=fresh_payload):
        demoted = runner.revalidate_succeeded_fingerprints(scene)

    assert demoted == 1
    assert beat.primary_take is None  # re-dispatchable by run_scene
    # Stored fingerprint refreshed to the new refs so a second resume is a no-op.
    assert beat.beat_metadata["inputs_fingerprint"] == \
        EpisodeRunner._compute_inputs_fingerprint(["/refs/composite_sheet.png"])


# ── Test 2: no drift → NOT demoted ─────────────────────────────────


def test_unchanged_refs_succeeded_beat_not_demoted():
    """A succeeded beat whose resolved refs are unchanged must NOT be demoted."""
    runner = _make_runner()

    refs = ["/refs/hero.png"]
    stored_fp = EpisodeRunner._compute_inputs_fingerprint(refs)
    beat = _make_beat(stored_fp)
    take = _add_succeeded_primary_take(beat)
    scene = _make_scene(beat)

    fresh_payload = {"reference_images": list(refs)}
    with patch(_BDP, return_value=fresh_payload):
        demoted = runner.revalidate_succeeded_fingerprints(scene)

    assert demoted == 0
    assert beat.primary_take is take  # untouched
    assert take.status == "succeeded"


# ── Test 3: refs disappeared (drift to zero refs) → demoted ────────


def test_refs_disappeared_succeeded_beat_is_demoted_on_resume():
    """Codex REC-12 finding: a succeeded beat whose refs are REMOVED (current
    resolve returns zero refs → empty fingerprint) is still drift and must be
    demoted — not silently accepted by an `and fresh_fp` guard."""
    runner = _make_runner()

    stored_fp = EpisodeRunner._compute_inputs_fingerprint(["/refs/hero.png"])
    assert stored_fp  # precondition: the beat WAS fingerprinted over real refs
    beat = _make_beat(stored_fp)
    _add_succeeded_primary_take(beat)
    scene = _make_scene(beat)

    # Current resolve returns NO refs (the ref was removed) → fresh_fp == "".
    fresh_payload = {"reference_images": []}
    with patch(_BDP, return_value=fresh_payload):
        demoted = runner.revalidate_succeeded_fingerprints(scene)

    assert demoted == 1
    assert beat.primary_take is None  # re-dispatchable by run_scene
