"""REC-111/REC-120 reroll engine — continuity/oner identity + stale-primary clear.

New behavioral suite for Phase 6. The FROZEN reroll suites
(``test_reroll_new_take*.py``) prove byte-compat separately; this file only
exercises the new seams:
  - ``_primary_take_grouping_for_reroll`` now accepts continuity/oner.
  - ``prepare_beat_for_reroll`` clears a stale primary (REC-120) + stamps the marker.
  - ``_preflight_reroll`` cleared-stale branch derives grouping from beat metadata.
  - ``strategy_override`` reaches ``resolve_strategy`` through the build seam.
"""
from __future__ import annotations

import json
from pathlib import Path
from unittest.mock import MagicMock

import pytest

from recoil.core.naming import build_filename
from recoil.pipeline.core.persistence import scene_path
from recoil.pipeline.core.receipts import GenerationReceipt, utc_now_iso8601
from recoil.pipeline.core.registry import MODALITY_R2V_MULTI, RunResult
from recoil.pipeline.core.take import Beat, Scene
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.pipeline.orchestrator.episode_runner import RerollPreflightError
from recoil.pipeline.orchestrator.tests.test_reroll_new_take import (
    _FakeStepRunner,
    _runner,
    _shot,
    _succeeded_beat,
)


@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
    root = tmp_path / "projects"
    root.mkdir()
    (root / ".recoil-data-root").touch()
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(root))
    monkeypatch.setattr(
        "recoil.pipeline.orchestrator.episode_runner.ops_log.write",
        lambda *a, **kw: None,
    )
    from recoil.pipeline.core.dispatch import _reset_bootstrap_for_tests
    from recoil.pipeline.core.registry import _reset_for_tests

    _reset_for_tests()
    _reset_bootstrap_for_tests()
    yield
    _reset_for_tests()
    _reset_bootstrap_for_tests()


def _take_with_video(video: Path):
    """Construct a succeeded Take whose video artifact resolves to ``video``."""
    beat = Beat(beat_id="GROUPING_TEST__cov", max_takes=5, beat_metadata={})
    wf = Workflow(
        workflow_id="GROUPING_TEST__cov_take_0",
        steps=[
            WorkflowStep(
                step_id="video",
                modality=MODALITY_R2V_MULTI,
                payload={},
                status="succeeded",
                receipt=GenerationReceipt(
                    receipt_id=f"rcpt_{video.stem}",
                    modality=MODALITY_R2V_MULTI,
                    caller_id="test",
                    project="fixture",
                    episode=1,
                    shot_id="GROUPING_TEST",
                    timestamp_utc=utc_now_iso8601(),
                    run_result=RunResult(
                        id=f"run_{video.stem}",
                        modality=MODALITY_R2V_MULTI,
                        output_path=str(video),
                        success=True,
                        metadata={"take_index": 1},
                    ),
                ),
            )
        ],
        global_provenance={"shot_id": beat.beat_id},
    )
    take = beat.new_take(workflow=wf)
    take.status = "succeeded"
    return take


# ── item 1: continuity / oner grouping identity ─────────────────────────────


def test_grouping_identity_from_continuity_filename(tmp_path):
    take = _take_with_video(tmp_path / "EP001_CONT_004_SH10_11_12_take2.mp4")
    runner = _runner(_FakeStepRunner(tmp_path, MagicMock()))

    grouping = runner._primary_take_grouping_for_reroll(take)

    assert grouping["strategy"] == "continuity"
    assert grouping["ordinal"] == 4
    assert grouping["shot_ids"] == ["EP001_SH10", "EP001_SH11", "EP001_SH12"]
    assert grouping["source_pass_id"] is None


def test_oner_identity_accepted(tmp_path):
    take = _take_with_video(tmp_path / "EP001_ONER_002_SH10_11_12_take2.mp4")
    runner = _runner(_FakeStepRunner(tmp_path, MagicMock()))

    grouping = runner._primary_take_grouping_for_reroll(take)

    assert grouping["strategy"] == "oner"
    assert grouping["ordinal"] == 2
    assert grouping["source_pass_id"] is None


def test_solo_multishot_still_rejected(tmp_path):
    # Byte-compat: a solo (no grouping token) multi-shot artifact still rejects.
    take = _take_with_video(tmp_path / "EP001_SH10_11_12_take2.mp4")
    runner = _runner(_FakeStepRunner(tmp_path, MagicMock()))

    with pytest.raises(RerollPreflightError) as exc:
        runner._primary_take_grouping_for_reroll(take)

    assert exc.value.error_code == "reroll_segment_drift"
    assert "could not derive PASS identity" in str(exc.value)


def test_unparsable_still_rejected(tmp_path):
    # Byte-compat: a non-canonical filename still rejects with reroll_segment_drift.
    take = _take_with_video(tmp_path / "not-a-canonical-name.mp4")
    runner = _runner(_FakeStepRunner(tmp_path, MagicMock()))

    with pytest.raises(RerollPreflightError) as exc:
        runner._primary_take_grouping_for_reroll(take)

    assert exc.value.error_code == "reroll_segment_drift"


# ── item 2: prepare_beat_for_reroll (REC-120 stale-primary clear) ───────────


def test_prepare_beat_clears_failed_primary(tmp_path):
    # FAILED primary → cleared + persisted with the explicit marker.
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    beat.primary_take.status = "failed"
    cleared_take_id = beat.primary_take_id
    scene = Scene(scene_id="PASS_011", beats=[beat])
    path = scene_path("fixture", "ep_001", "PASS_011")
    runner = _runner(_FakeStepRunner(video_dir, MagicMock()))

    result = runner.prepare_beat_for_reroll(scene, beat)

    assert result == {
        "beat_id": beat.beat_id,
        "cleared_stale_primary": cleared_take_id,
        "next_take_index": len(beat.takes),
    }
    assert beat.primary_take_id is None
    assert beat.primary_take is None
    assert beat.beat_metadata["reroll_cleared_stale"] == cleared_take_id

    # The clear (and its marker) persisted via the existing scene save path.
    loaded = Scene.from_dict(json.loads(path.read_text()))
    assert loaded.beats[0].primary_take_id is None
    assert loaded.beats[0].beat_metadata["reroll_cleared_stale"] == cleared_take_id

    # Healthy succeeded primary (artifact present on disk) → untouched, no persist.
    healthy_dir = tmp_path / "healthy"
    healthy = _succeeded_beat(healthy_dir)
    healthy_primary_id = healthy.primary_take_id
    healthy_scene = Scene(scene_id="PASS_012", beats=[healthy])
    healthy_path = scene_path("fixture", "ep_001", "PASS_012")

    healthy_result = runner.prepare_beat_for_reroll(healthy_scene, healthy)

    assert healthy_result["cleared_stale_primary"] is None
    assert healthy.primary_take_id == healthy_primary_id
    assert "reroll_cleared_stale" not in healthy.beat_metadata
    assert not healthy_path.exists()  # no write when nothing was cleared


def test_prepare_beat_threads_loaded_version_for_stale_clear(tmp_path, monkeypatch):
    """REC-231 TOCTOU: the stale-primary clear must persist to the version the reroll
    caller LOADED (threaded expected_version), NOT the pointer re-read at save time — so a
    conform/revert between the reroll load and this clear surfaces as a version conflict
    under the lock instead of silently writing the stale scene into a newly-active version.
    """
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    beat.primary_take.status = "failed"  # stale → the clear fires
    scene = Scene(scene_id="PASS_011", beats=[beat])
    runner = _runner(_FakeStepRunner(video_dir, MagicMock()))

    seen: dict = {}

    def _spy(s, *, expected_version, dry_run=False):
        seen["expected_version"] = expected_version

    monkeypatch.setattr(runner, "_persist_active_status", _spy)

    runner.prepare_beat_for_reroll(scene, beat, expected_version=7)

    # The threaded version (7) is forwarded verbatim. A flat scene's active pointer would
    # re-derive to 1, so == 7 proves the load-version is honored, not re-read at save time.
    assert seen["expected_version"] == 7


# ── item 3: cleared-stale preflight derives grouping from beat metadata ─────


def test_cleared_stale_reroll_uses_beat_metadata_grouping(tmp_path):
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    # REC-102 grouping stamp persisted on the beat metadata.
    beat.beat_metadata["grouping"] = {
        "strategy": "continuity",
        "ordinal": 4,
        "shot_ids": ["EP001_SH23", "EP001_SH24", "EP001_SH25"],
        "source_pass_id": None,
    }
    # Make the primary stale: its video artifact is missing on disk.
    Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path).unlink()
    scene = Scene(scene_id="PASS_011", beats=[beat])
    runner = _runner(_FakeStepRunner(video_dir, MagicMock()))

    prep = runner.prepare_beat_for_reroll(scene, beat)
    assert prep["cleared_stale_primary"] is not None
    assert beat.primary_take is None
    assert beat.beat_metadata.get("reroll_cleared_stale")

    # Preflight proceeds WITHOUT a succeeded primary — grouping comes from metadata.
    target = runner._preflight_reroll(scene, target_beat_id=beat.beat_id, allow_cleared_stale=True)
    assert target is beat

    # Prove the collision preflight used the metadata-derived continuity grouping:
    # a take-2 file at exactly that identity is flagged as a collision.
    collision = video_dir / build_filename(
        episode=1,
        strategy="continuity",
        ordinal=4,
        shot_ids=["EP001_SH23", "EP001_SH24", "EP001_SH25"],
        take=len(beat.takes) + 1,
    )
    collision.parent.mkdir(parents=True, exist_ok=True)
    collision.write_bytes(b"existing-continuity-reroll")

    with pytest.raises(RerollPreflightError) as exc:
        runner._preflight_reroll(scene, target_beat_id=beat.beat_id, allow_cleared_stale=True)

    assert exc.value.error_code == "reroll_collision"


def test_cleared_stale_reroll_without_grouping_errors(tmp_path):
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    # No usable grouping stamp in metadata.
    beat.beat_metadata.pop("grouping", None)
    Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path).unlink()
    scene = Scene(scene_id="PASS_011", beats=[beat])
    runner = _runner(_FakeStepRunner(video_dir, MagicMock()))

    runner.prepare_beat_for_reroll(scene, beat)
    assert beat.primary_take is None
    assert beat.beat_metadata.get("reroll_cleared_stale")

    with pytest.raises(RerollPreflightError) as exc:
        runner._preflight_reroll(scene, target_beat_id=beat.beat_id, allow_cleared_stale=True)

    assert exc.value.error_code == "reroll_identity_unresolvable"


# ── item 4: strategy_override reaches resolve_strategy through the build seam ─


def test_strategy_override_reaches_resolver(monkeypatch):
    from recoil.pipeline._lib import dispatch_payload

    class _StopResolve(Exception):
        pass

    recorded: dict = {}

    def _recording_resolve(primitive, *, explicit=None, model_id, requested_modality=None):
        recorded["explicit"] = explicit
        raise _StopResolve()

    monkeypatch.setattr(dispatch_payload, "resolve_strategy", _recording_resolve)
    shots = [_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]

    # Override flows: build_dispatch_payload → PayloadContext.strategy_override →
    # _build_author_aware_prompt → resolve_strategy(explicit=...).
    with pytest.raises(_StopResolve):
        dispatch_payload.build_dispatch_payload(
            shot=shots[0],
            project="fixture",
            modality="r2v_multi",
            episode="ep_001",
            batch_shots=shots,
            strategy_override="shot_spec",
        )
    assert recorded["explicit"] == "shot_spec"

    # No override → None reaches the resolver (default everywhere).
    recorded.clear()
    with pytest.raises(_StopResolve):
        dispatch_payload.build_dispatch_payload(
            shot=shots[0],
            project="fixture",
            modality="r2v_multi",
            episode="ep_001",
            batch_shots=shots,
        )
    assert recorded["explicit"] is None



def test_cleared_stale_empty_shot_ids_no_crash(tmp_path):
    """Merge-gate r6: metadata-empty shot_ids must not crash build_filename."""
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    beat.beat_metadata["grouping"] = {
        "strategy": "continuity",
        "ordinal": 4,
        "shot_ids": [],
        "source_pass_id": None,
    }
    beat.beat_metadata.setdefault("shot", {})["shot_id"] = "EP001_SH10"
    Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path).unlink()
    scene = Scene(scene_id="PASS_011", beats=[beat])
    runner = _runner(_FakeStepRunner(video_dir, MagicMock()))
    runner.prepare_beat_for_reroll(scene, beat)
    target = runner._preflight_reroll(scene, target_beat_id=beat.beat_id, allow_cleared_stale=True)
    assert target is beat


def test_marker_alone_does_not_unlock_legacy_preflight(tmp_path):
    """Merge-gate r8: a persisted reroll_cleared_stale marker must NOT bypass
    reroll_requires_succeeded_primary for legacy callers (no allow flag)."""
    import pytest
    from recoil.pipeline.orchestrator.episode_runner import RerollPreflightError

    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    beat.beat_metadata["grouping"] = {
        "strategy": "continuity", "ordinal": 4,
        "shot_ids": ["EP001_SH23"], "source_pass_id": None,
    }
    beat.beat_metadata["reroll_cleared_stale"] = "stale_take"
    Path(beat.primary_take.workflow.steps[0].receipt.run_result.output_path).unlink()
    beat.primary_take_id = None
    scene = Scene(scene_id="PASS_011", beats=[beat])
    runner = _runner(_FakeStepRunner(video_dir, MagicMock()))
    with pytest.raises(RerollPreflightError) as exc:
        runner._preflight_reroll(scene, target_beat_id=beat.beat_id)
    assert exc.value.error_code == "reroll_requires_succeeded_primary"
    # with the explicit flag (new entry points) it proceeds:
    assert runner._preflight_reroll(
        scene, target_beat_id=beat.beat_id, allow_cleared_stale=True
    ) is beat
