from __future__ import annotations

import asyncio
import hashlib
import json
from pathlib import Path

import pytest

from recoil.core.paths import ProjectPaths as CoreProjectPaths
from recoil.pipeline._lib import derivation_manifest
from recoil.pipeline._lib.derivation_sha import content_sha, plan_structural_sha
from recoil.pipeline._lib.plan_loader import CanonicalPlan, load_plan
from recoil.pipeline.cli import generate
from recoil.pipeline.core.persistence import (
    load_manifest, load_scene, save_scene, scene_path, scene_version_path,
)
from recoil.pipeline.orchestrator.batch_selector import parse_batch_selector
from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner


PROJECT = "fixture"
EPISODE_NUM = 1
EPISODE = "ep_001"


@pytest.fixture(autouse=True)
def _projects_root(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
    root = tmp_path / "projects"
    root.mkdir()
    (root / ".recoil-data-root").touch()
    project_root = root / PROJECT
    project_root.mkdir()
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(root))
    monkeypatch.setenv("RECOIL_BOARD_GATE", "0")
    return project_root


def _text_hash(text: str) -> str:
    return hashlib.md5(text.encode("utf-8")).hexdigest()


def _raw_shot(
    index: int,
    *,
    source_text: str,
    location_id: str,
    scene_index: int,
) -> dict:
    shot_id = f"EP001_SH{index:02d}"
    return {
        "shot_id": shot_id,
        "scene_index": scene_index,
        "pipeline": "video",
        "video_model": "seeddance-2.0",
        "aspect_ratio": "9:16",
        "asset_data": {"location_id": location_id, "characters": []},
        "prompt_data": {"shot_type": "MS"},
        "routing_data": {
            "target_editorial_duration_s": 2.0,
            "is_env_only": True,
            "has_dialogue": False,
        },
        "source_text": source_text,
        "source_text_hash": _text_hash(source_text),
    }


def _plan_raw(kind: str, *, target_text: str) -> dict:
    if kind == "cont":
        scene_indexes = [1, 2, 3, 4, 5, 6]
    elif kind == "oner":
        scene_indexes = [1, 1, 1, 2, 2, 2]
    else:  # pragma: no cover - test helper guard
        raise AssertionError(kind)

    shots = []
    for index in range(1, 7):
        is_target = index <= 3
        shots.append(
            _raw_shot(
                index,
                source_text=(
                    f"{target_text} target shot {index}"
                    if is_target
                    else f"unchanged sibling shot {index}"
                ),
                location_id="LOC_A" if is_target else "LOC_B",
                scene_index=scene_indexes[index - 1],
            )
        )
    return {
        "episode_id": EPISODE,
        "project": PROJECT,
        "total_shots": len(shots),
        "shots": shots,
    }


def _write_plan(raw: dict) -> CanonicalPlan:
    paths = CoreProjectPaths.for_project(PROJECT)
    paths.plans_dir.mkdir(parents=True, exist_ok=True)
    plan_path = paths.plans_dir / "ep_001_plan.json"
    plan_path.write_text(json.dumps(raw, indent=2), encoding="utf-8")
    return load_plan(plan_path)


def _seed_episode(kind: str) -> tuple[CanonicalPlan, dict, str, list[str]]:
    old_raw = _plan_raw(kind, target_text="old script")
    old_plan = _write_plan(old_raw)
    grouping = "continuity" if kind == "cont" else "oner"
    runner = EpisodeRunner(
        project=PROJECT,
        plan=old_plan.raw,
        episode=EPISODE,
        budget_usd=25.0,
    )
    result = asyncio.run(
        runner.run_episode_batches(
            old_plan,
            derive_only=True,
            grouping=grouping,
        )
    )
    scene_ids = list(result["written"])
    assert scene_ids == (
        ["BATCH_001", "BATCH_002"] if kind == "cont" else ["ONER_001", "ONER_002"]
    )
    return old_plan, _plan_raw(kind, target_text="new script"), scene_ids[0], scene_ids


def _sha256(path: Path) -> str:
    return hashlib.sha256(path.read_bytes()).hexdigest()


def _file_snapshot(root: Path) -> dict[str, str]:
    return {
        str(path.relative_to(root)): _sha256(path)
        for path in sorted(root.rglob("*"))
        if path.is_file()
    }


def _scene_texts(scene_id: str) -> dict[str, str]:
    return _scene_texts_at(scene_path(PROJECT, EPISODE, scene_id))


def _scene_texts_at(path: Path) -> dict[str, str]:
    scene = load_scene(path)
    texts: dict[str, str] = {}
    for beat in scene.beats:
        metadata = beat.beat_metadata or {}
        shots = metadata.get("batch_shots") or [metadata.get("shot")]
        for shot in shots:
            if not shot:
                continue
            raw = shot.get("raw") or {}
            texts[shot["shot_id"]] = raw.get("source_text", "")
    return texts


def _install_fake_upstream(
    monkeypatch: pytest.MonkeyPatch,
    *,
    new_plan_raw: dict,
    calls: list[str],
) -> None:
    class FakeIngestPipeline:
        def __init__(self, *, project: str, project_root: Path) -> None:
            self.project = project
            self.project_root = project_root

        def run_camera_test(self, episode_num: int):  # noqa: ANN001
            calls.append("camera_test")
            cache = (
                CoreProjectPaths.for_project(PROJECT).visual_state_dir
                / "camera_tested"
                / f"ep_{episode_num:03d}.json"
            )
            cache.parent.mkdir(parents=True, exist_ok=True)
            payload = {"episode": f"ep_{episode_num:03d}", "source": "fake"}
            cache.write_text(json.dumps(payload, indent=2), encoding="utf-8")
            derivation_manifest.stamp_stage(
                PROJECT,
                episode_num,
                "camera_tested",
                kind="derived",
                content_sha=content_sha(payload),
                structural_sha=None,
                source={"script_sha": "fake-script-sha"},
                builder="test.camera_test",
            )
            return payload

        def _load_bible(self):  # noqa: ANN001
            calls.append("bible")
            return {}

        def run_storyboard_pass(self, episode_num: int, bible):  # noqa: ANN001
            calls.append("plan")
            plan = _write_plan(new_plan_raw)
            derivation_manifest.stamp_stage(
                PROJECT,
                episode_num,
                "plan",
                kind="derived",
                content_sha=content_sha(new_plan_raw),
                structural_sha=plan_structural_sha(new_plan_raw),
                source={"camera_tested_content_sha": "fake-camera"},
                builder="test.plan",
                extra={"shot_ids": [shot.shot_id for shot in plan.shots]},
            )
            return new_plan_raw

    def fake_gate(project: str, episode: int) -> int:
        calls.append("extract")
        path = CoreProjectPaths.for_project(project).episode_breakdown_dir(episode)
        path.mkdir(parents=True, exist_ok=True)
        (path / "mention_ledger.json").write_text(
            json.dumps({"episode": episode, "source": "fake"}, indent=2),
            encoding="utf-8",
        )
        return generate.EXIT_OK

    monkeypatch.setattr(generate, "IngestPipeline", FakeIngestPipeline)
    monkeypatch.setattr(generate, "_run_gate", fake_gate)


def _trap_paid_dispatch(monkeypatch: pytest.MonkeyPatch, calls: list[str]) -> None:
    async def fail_run_scene(self, scene, **kwargs):  # noqa: ANN001, ANN003
        calls.append(scene.scene_id)
        raise AssertionError("run_scene must not be called by --from-script")

    def fail_board(*args, **kwargs):  # noqa: ANN002, ANN003
        calls.append("board")
        raise AssertionError("board render must not be called by --from-script")

    monkeypatch.setattr(generate.EpisodeRunner, "run_scene", fail_run_scene)
    monkeypatch.setattr(generate, "build_and_dispatch_board", fail_board)
    monkeypatch.setattr(generate, "render_board_finish", fail_board)


@pytest.mark.parametrize(
    ("kind", "selector_token", "expected_grouping", "expected_target"),
    [
        ("cont", "EP001_CONT_001", "continuity", "BATCH_001"),
        ("oner", "EP001_ONER_001", "oner", "ONER_001"),
    ],
)
def test_from_script_refreshes_target_only_and_is_idempotent(
    monkeypatch: pytest.MonkeyPatch,
    kind: str,
    selector_token: str,
    expected_grouping: str,
    expected_target: str,
) -> None:
    _old_plan, new_raw, target_id, scene_ids = _seed_episode(kind)
    assert target_id == expected_target
    selector = parse_batch_selector(selector_token)
    assert selector is not None

    target_path = scene_path(PROJECT, EPISODE, target_id)
    sibling_paths = [
        scene_path(PROJECT, EPISODE, scene_id)
        for scene_id in scene_ids
        if scene_id != target_id
    ]
    target_before = _sha256(target_path)
    siblings_before = {path: _sha256(path) for path in sibling_paths}
    assert all(text.startswith("old script") for text in _scene_texts(target_id).values())

    upstream_calls: list[str] = []
    paid_calls: list[str] = []
    batch_kwargs: list[dict] = []
    _install_fake_upstream(monkeypatch, new_plan_raw=new_raw, calls=upstream_calls)
    _trap_paid_dispatch(monkeypatch, paid_calls)

    original_run_batches = generate.EpisodeRunner.run_episode_batches

    async def spy_run_batches(self, canonical_plan, **kwargs):  # noqa: ANN001, ANN003
        batch_kwargs.append(dict(kwargs))
        return await original_run_batches(self, canonical_plan, **kwargs)

    monkeypatch.setattr(generate.EpisodeRunner, "run_episode_batches", spy_run_batches)

    rc = generate._run_from_script_rederive(
        PROJECT,
        EPISODE_NUM,
        selector,
        dry_run=False,
    )

    assert rc == generate.EXIT_OK
    assert upstream_calls == ["camera_test", "bible", "plan", "extract"]
    assert paid_calls == []
    assert batch_kwargs[-1]["grouping"] == expected_grouping
    assert batch_kwargs[-1]["only_scene_ids"] == {target_id}
    # REC-231 Phase 2: re-derivation no longer refreshes the active body IN PLACE — it
    # appends a v2 candidate and leaves the pointer at v1. So the flat/v1 body is
    # byte-PRESERVED (still "old script"); the "new script" lands in the v2 candidate
    # (the operator conforms to it via the Phase 6 loop, not in-place here).
    assert _sha256(target_path) == target_before
    assert {
        path: _sha256(path)
        for path in sibling_paths
    } == siblings_before
    assert load_manifest(PROJECT, EPISODE, target_id)["active_version"] == 1
    assert all(text.startswith("old script") for text in _scene_texts(target_id).values())
    assert all(
        text.startswith("new script")
        for text in _scene_texts_at(
            scene_version_path(PROJECT, EPISODE, target_id, 2)
        ).values()
    )

    after_first = _sha256(target_path)
    rc_second = generate._run_from_script_rederive(
        PROJECT,
        EPISODE_NUM,
        selector,
        dry_run=False,
    )

    assert rc_second == generate.EXIT_OK
    assert _sha256(target_path) == after_first
    assert {
        path: _sha256(path)
        for path in sibling_paths
    } == siblings_before
    assert paid_calls == []


def test_from_script_dry_run_mutates_nothing(
    monkeypatch: pytest.MonkeyPatch,
    _projects_root: Path,
) -> None:
    _old_plan, new_raw, _target_id, _scene_ids = _seed_episode("cont")
    selector = parse_batch_selector("EP001_CONT_001")
    assert selector is not None
    before = _file_snapshot(_projects_root)
    upstream_calls: list[str] = []
    _install_fake_upstream(monkeypatch, new_plan_raw=new_raw, calls=upstream_calls)
    monkeypatch.setattr(
        generate,
        "acquire_episode_lock",
        lambda *args, **kwargs: pytest.fail("dry-run must not take the lock"),
    )

    rc = generate._run_from_script_rederive(
        PROJECT,
        EPISODE_NUM,
        selector,
        dry_run=True,
    )

    assert rc == generate.EXIT_OK
    assert upstream_calls == []
    assert _file_snapshot(_projects_root) == before


def test_from_script_locked_target_appends_candidate(
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    """REC-231 Phase 7: a from-script re-derive of a LOCKED target APPENDS a v2 candidate
    with NO force needed — the ``--force-scene-overwrite`` flag and the lock-as-clobber
    skip are DELETED, so ``scene.locked`` is inert. The locked flat body (v1) is
    byte-PRESERVED, the pointer stays at v1, the "new script" lands in the v2 candidate,
    and no ``.pre-force-*.bak`` backup is ever written (no destructive overwrite path)."""
    _old_plan, new_raw, target_id, _scene_ids = _seed_episode("cont")
    selector = parse_batch_selector("EP001_CONT_001")
    assert selector is not None
    target_path = scene_path(PROJECT, EPISODE, target_id)
    locked_scene = load_scene(target_path)
    locked_scene.locked = True
    locked_scene.lock_reason = "manual staging"
    save_scene(locked_scene, target_path)
    locked_before = _sha256(target_path)

    upstream_calls: list[str] = []
    paid_calls: list[str] = []
    _install_fake_upstream(monkeypatch, new_plan_raw=new_raw, calls=upstream_calls)
    _trap_paid_dispatch(monkeypatch, paid_calls)

    rc = generate._run_from_script_rederive(
        PROJECT,
        EPISODE_NUM,
        selector,
        dry_run=False,
    )

    assert rc == generate.EXIT_OK
    # The locked flat body (v1) is byte-PRESERVED; the candidate appends as v2 and the
    # pointer never moves (lock is inert — no clobber, no backup).
    assert _sha256(target_path) == locked_before
    assert load_manifest(PROJECT, EPISODE, target_id)["active_version"] == 1
    assert all(text.startswith("old script") for text in _scene_texts(target_id).values())
    assert all(
        text.startswith("new script")
        for text in _scene_texts_at(
            scene_version_path(PROJECT, EPISODE, target_id, 2)
        ).values()
    )
    backups = sorted(target_path.parent.glob(target_path.name + ".pre-force-*.bak"))
    assert not backups  # append path takes no destructive backup
    assert paid_calls == []
