"""REC-231 Phase 3 — conform/revert (sole pointer movers) + structure_fingerprint.

Acceptance tests for the pointer-mover verbs added to SceneVersionStore, the
deterministic structural fingerprint wired into every manifest write, and the
load-bearing fcntl serialization of the shared _locked_manifest_update helper.
"""
import threading

import pytest

from recoil.pipeline.core.persistence import (
    load_manifest,
    load_scene,
    load_scene_active,
    save_scene,
    scene_path,
    scene_version_path,
    structure_fingerprint,
)
from recoil.pipeline.core.scene_version_store import SceneVersionStore
from recoil.pipeline.core.take import Beat, Scene
from recoil.pipeline.core.workflow import Workflow, WorkflowStep


def _make_wf(wid="wf"):
    return Workflow(
        workflow_id=wid,
        steps=[WorkflowStep(step_id="kf", modality="image_t2i",
                            payload={"prompt": "x"})],
    )


def _downstream(manifest, version):
    return next(v for v in manifest["versions"] if v["version"] == version)["downstream"]


def _materialize_v1_v2(project, episode, batch_id):
    """Flat v1 (legacy/approved/derived) + an appended v2 candidate; pointer at v1."""
    save_scene(Scene(scene_id=batch_id, beats=[Beat(beat_id="OPENING")]),
               scene_path(project, episode, batch_id))
    store = SceneVersionStore(project, episode)
    store.write_scene_candidate(
        batch_id, Scene(scene_id=batch_id, beats=[Beat("A"), Beat("B")]))
    return store


# ── conform / revert (sole pointer movers) ───────────────────────────────────


def test_conform_moves_pointer_to_candidate(tmp_path, monkeypatch):
    """conform(batch, 2) makes load_scene_active return v2's structure and marks v2
    approved (the SOLE mover of active_version)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = _materialize_v1_v2("p", "ep_001", "BATCH_001")
    assert load_manifest("p", "ep_001", "BATCH_001")["active_version"] == 1

    updated = store.conform("BATCH_001", 2)
    assert updated["active_version"] == 2
    # the live loader now resolves v2's structure (two beats A, B)
    assert [b.beat_id for b in load_scene_active("p", "ep_001", "BATCH_001").beats] == ["A", "B"]
    # the target version is marked approved
    v2 = next(v for v in updated["versions"] if v["version"] == 2)
    assert v2["state"] == "approved"
    # persisted to disk, not just the in-memory dict
    assert load_manifest("p", "ep_001", "BATCH_001")["active_version"] == 2


def test_revert_is_lossless_pointer_only(tmp_path, monkeypatch):
    """After conform→v2, revert(batch, 1) restores v1's full board graph and ONLY the
    manifest changes on disk — both version bodies are byte-identical before/after."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = _materialize_v1_v2("p", "ep_001", "BATCH_001")
    store.conform("BATCH_001", 2)

    # v1's body IS the flat artifact; v2's body is the .v002.json
    v1_path = scene_path("p", "ep_001", "BATCH_001")
    v2_path = scene_version_path("p", "ep_001", "BATCH_001", 2)
    v1_before = v1_path.read_bytes()
    v2_before = v2_path.read_bytes()

    updated = store.revert("BATCH_001", 1)
    assert updated["active_version"] == 1
    # v1's full structure comes back (the opening beat)
    assert [b.beat_id for b in load_scene_active("p", "ep_001", "BATCH_001").beats] == ["OPENING"]
    # lossless: neither version body was touched — only the pointer moved
    assert v1_path.read_bytes() == v1_before
    assert v2_path.read_bytes() == v2_before


def test_conform_requires_manifest_and_registered_version(tmp_path, monkeypatch):
    """conform raises ValueError when there is no manifest (nothing to conform to) and
    when the target version is not registered."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = SceneVersionStore("p", "ep_001")
    # no manifest at all
    with pytest.raises(ValueError):
        store.conform("BATCH_001", 1)
    # manifest exists (v1 + v2) but v9 is unregistered
    _materialize_v1_v2("p", "ep_001", "BATCH_001")
    with pytest.raises(ValueError):
        store.conform("BATCH_001", 9)
    with pytest.raises(ValueError):
        store.revert("BATCH_001", 9)


def test_revert_to_derived_version_unblocked(tmp_path, monkeypatch):
    """A pointer move NEVER recomputes downstream: a fresh candidate v2 stays
    not_derived across a conform, and reverting to the already-derived v1 leaves v1
    derived — lossless AND dispatch-ready. conform/revert touch ONLY active_version."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = _materialize_v1_v2("p", "ep_001", "BATCH_001")

    m0 = load_manifest("p", "ep_001", "BATCH_001")
    assert _downstream(m0, 1) == "derived"      # legacy v1 born derived
    assert _downstream(m0, 2) == "not_derived"  # fresh candidate born not_derived

    # conform→v2 moves the pointer but leaves v2 not_derived (the gate would block)
    m1 = store.conform("BATCH_001", 2)
    assert m1["active_version"] == 2
    assert _downstream(m1, 2) == "not_derived"
    assert _downstream(m1, 1) == "derived"

    # revert→v1 (already derived) is lossless AND un-blocked — downstream untouched
    m2 = store.revert("BATCH_001", 1)
    assert m2["active_version"] == 1
    assert _downstream(m2, 1) == "derived"
    assert _downstream(m2, 2) == "not_derived"  # never recomputed by the pointer move


def test_conform_missing_body_fails_closed_pointer_unmoved(tmp_path, monkeypatch):
    """conform refuses a registered version whose body is missing and leaves the active
    pointer unchanged."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = _materialize_v1_v2("p", "ep_001", "BATCH_001")
    scene_version_path("p", "ep_001", "BATCH_001", 2).unlink()

    with pytest.raises(FileNotFoundError):
        store.conform("BATCH_001", 2)

    assert load_manifest("p", "ep_001", "BATCH_001")["active_version"] == 1


def test_revert_missing_body_fails_closed_pointer_unmoved(tmp_path, monkeypatch):
    """revert refuses a registered version whose body is missing and leaves the active
    pointer unchanged."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = _materialize_v1_v2("p", "ep_001", "BATCH_001")
    store.conform("BATCH_001", 2)
    scene_path("p", "ep_001", "BATCH_001").unlink()

    with pytest.raises(FileNotFoundError):
        store.revert("BATCH_001", 1)

    assert load_manifest("p", "ep_001", "BATCH_001")["active_version"] == 2


# ── structure_fingerprint: determinism + manifest wiring ─────────────────────


def test_fingerprint_same_structure_compatible():
    """Two scenes with identical beat ids + shot structure → equal fingerprint."""
    s1 = Scene(scene_id="BATCH_001", beats=[
        Beat(beat_id="B0", beat_metadata={"shot": {"prompt": "wide street"}}),
        Beat(beat_id="B1", beat_metadata={"shot": {"prompt": "close face"}}),
    ])
    s2 = Scene(scene_id="BATCH_001", beats=[
        Beat(beat_id="B0", beat_metadata={"shot": {"prompt": "wide street"}}),
        Beat(beat_id="B1", beat_metadata={"shot": {"prompt": "close face"}}),
    ])
    assert structure_fingerprint(s1) == structure_fingerprint(s2)


def test_fingerprint_canonical_nested_key_order():
    """Nested-dict key order does NOT affect the hash (sort_keys canonicalization)."""
    a = Scene(scene_id="BATCH_001", beats=[
        Beat(beat_id="B0", beat_metadata={"shot": {"a": 1, "b": 2, "c": 3}})])
    b = Scene(scene_id="BATCH_001", beats=[
        Beat(beat_id="B0", beat_metadata={"shot": {"c": 3, "b": 2, "a": 1}})])
    assert structure_fingerprint(a) == structure_fingerprint(b)


def test_fingerprint_populated_in_manifest(tmp_path, monkeypatch):
    """After write_scene_candidate (append) AND legacy-v1 materialization, every manifest
    entry's structure_fingerprint is NON-null and equals structure_fingerprint(that
    version's on-disk body); re-reading is stable. Proves the field is wired, not null."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    flat_scene = Scene(scene_id="BATCH_001",
                       beats=[Beat(beat_id="OPENING",
                                   beat_metadata={"shot": {"prompt": "open"}})])
    save_scene(flat_scene, scene_path("p", "ep_001", "BATCH_001"))
    candidate = Scene(scene_id="BATCH_001",
                      beats=[Beat("A", beat_metadata={"shot": {"prompt": "a"}}),
                             Beat("B")])
    SceneVersionStore("p", "ep_001").write_scene_candidate("BATCH_001", candidate)

    manifest = load_manifest("p", "ep_001", "BATCH_001")
    v1, v2 = manifest["versions"][0], manifest["versions"][1]
    # neither is left null
    assert v1["structure_fingerprint"] is not None
    assert v2["structure_fingerprint"] is not None
    # each equals the fingerprint of that version's actual on-disk body
    v1_body = load_scene(scene_path("p", "ep_001", "BATCH_001"))            # flat = v1
    v2_body = load_scene(scene_version_path("p", "ep_001", "BATCH_001", 2))
    assert v1["structure_fingerprint"] == structure_fingerprint(v1_body)
    assert v2["structure_fingerprint"] == structure_fingerprint(v2_body)
    # re-reading the manifest is stable
    again = load_manifest("p", "ep_001", "BATCH_001")
    assert again["versions"][0]["structure_fingerprint"] == v1["structure_fingerprint"]
    assert again["versions"][1]["structure_fingerprint"] == v2["structure_fingerprint"]


def test_fingerprint_ignores_volatile_fields():
    """The load-bearing case: mutating ONLY created_at/approval/board/artifact paths AND
    crucially inputs_fingerprint (a recomputed-every-dispatch field) leaves the
    fingerprint unchanged — so an ordinary pre-dispatch metadata refresh never falsely
    registers as a structural change."""
    base = Scene(scene_id="BATCH_001", beats=[
        Beat(beat_id="B0", beat_metadata={"shot": {"prompt": "wide"},
                                          "inputs_fingerprint": "digest-0"})])
    fp0 = structure_fingerprint(base)

    base.created_at = "2099-01-01T00:00:00Z"
    base.beats[0].created_at = "2099-01-01T00:00:00Z"
    base.beats[0].approved = True
    base.beats[0].new_take(workflow=_make_wf("t0"))
    base.beats[0].primary_take_id = base.beats[0].takes[0].take_id
    base.beats[0].board = {"status": "approved", "artifact": "char/x.png",
                           "source_sha256": "deadbeef", "approved_by": "jt",
                           "updated_at": "2099-01-01T00:00:00Z"}
    base.beats[0].phantom_recovery_count = 2
    base.beats[0].max_takes = 9
    base.locked = True
    base.lock_reason = "staging"
    # the recomputed-every-dispatch digest MUST be ignored
    base.beats[0].beat_metadata["inputs_fingerprint"] = "totally-different-digest"

    assert structure_fingerprint(base) == fp0


# ── Parametrized projection coverage (closes the underspecified-projection MAJOR) ──

# init-style beat carries `plan_shot`; grouped/re-derived beat carries `shot`/`batch_shots`
# — both structural shapes covered. `scene_id` lives on both beats.
_STRUCTURAL_KEYS = [
    "plan_shot", "shot", "batch_shots", "batch_summary", "grouping", "modality",
    "generation_config", "element_config", "prompt_directive", "scene_id",
]
_EXCLUDED_FIELDS = [
    "inputs_fingerprint", "takes", "board", "approved", "primary_take_id",
    "timestamps", "phantom_recovery_count", "max_takes", "locked",
]


def _structural_base_scene():
    return Scene(
        scene_id="BATCH_001",
        beats=[
            # init-style beat: plan_shot present
            Beat(beat_id="B0", beat_metadata={
                "plan_shot": {"shot_id": "EP001_SH01", "shot_type": "WS"},
                "scene_id": "BATCH_001",
            }),
            # grouped/re-derived beat: shot + batch_shots present
            Beat(beat_id="B1", beat_metadata={
                "shot": {"prompt": "close on face", "shot_id": "EP001_SH02"},
                "batch_shots": ["EP001_SH02", "EP001_SH03"],
                "batch_summary": "two-shot exchange",
                "grouping": {"group_id": "G1"},
                "modality": "video_i2v",
                "generation_config": {"model": "seeddance-2.0"},
                "element_config": {"refs": ["char/x"]},
                "prompt_directive": "hold the eyeline",
                "scene_id": "BATCH_001",
                "inputs_fingerprint": "volatile-digest-0",
            }),
        ],
    )


def _mutate_value(v):
    if isinstance(v, dict):
        return {**v, "__mut__": "changed"}
    if isinstance(v, list):
        return v + ["__mut__"]
    if isinstance(v, str):
        return v + "__mut__"
    return "changed"


def _apply_structural_mutation(scene, kind):
    if kind == "beat_reorder":
        scene.beats = list(reversed(scene.beats))
    elif kind == "beat_add":
        scene.add_beat(Beat(beat_id="B2", beat_metadata={"shot": {"prompt": "new"}}))
    elif kind == "beat_remove":
        scene.beats = scene.beats[:1]
    else:  # a beat_metadata structural key — mutate on whichever beat(s) carry it
        for beat in scene.beats:
            if kind in beat.beat_metadata:
                beat.beat_metadata[kind] = _mutate_value(beat.beat_metadata[kind])


def _mutate_excluded(scene, field):
    if field == "inputs_fingerprint":
        scene.beats[1].beat_metadata["inputs_fingerprint"] = "different-volatile-digest"
    elif field == "takes":
        scene.beats[0].new_take(workflow=_make_wf("t0"))
    elif field == "board":
        scene.beats[0].board = {"status": "approved", "artifact": "char/x.png",
                                "source_sha256": "deadbeef", "approved_by": "jt",
                                "updated_at": "2099-01-01T00:00:00Z"}
    elif field == "approved":
        scene.beats[0].approved = True
    elif field == "primary_take_id":
        t = scene.beats[0].new_take(workflow=_make_wf("t0"))
        scene.beats[0].primary_take_id = t.take_id
    elif field == "timestamps":
        scene.created_at = "2099-01-01T00:00:00Z"
        scene.beats[0].created_at = "2099-01-01T00:00:00Z"
    elif field == "phantom_recovery_count":
        scene.beats[0].phantom_recovery_count = 2
    elif field == "max_takes":
        scene.beats[0].max_takes = 9
    elif field == "locked":
        scene.locked = True
        scene.lock_reason = "staging"
    else:  # pragma: no cover - guards the param list
        raise AssertionError(f"unknown excluded field {field!r}")


@pytest.mark.parametrize(
    "kind", _STRUCTURAL_KEYS + ["beat_reorder", "beat_add", "beat_remove"]
)
def test_fingerprint_per_structural_field_changes(kind):
    """Mutating EACH included structural key, a beat reorder, and a beat add/remove EACH
    changes the fingerprint — no structural key is silently omitted from the projection."""
    fp0 = structure_fingerprint(_structural_base_scene())
    mutated = _structural_base_scene()
    _apply_structural_mutation(mutated, kind)
    assert structure_fingerprint(mutated) != fp0, f"{kind} must change the fingerprint"


@pytest.mark.parametrize("field", _EXCLUDED_FIELDS)
def test_fingerprint_per_excluded_field_unchanged(field):
    """Mutating EACH excluded field (incl. inputs_fingerprint) leaves the fingerprint
    UNCHANGED — no volatile field is wrongly included in the projection."""
    fp0 = structure_fingerprint(_structural_base_scene())
    mutated = _structural_base_scene()
    _mutate_excluded(mutated, field)
    assert structure_fingerprint(mutated) == fp0, f"{field} must NOT change the fingerprint"


# ── Locking stress test: fcntl serialization is load-bearing ─────────────────


# ── Phase 5: mark_derived — the SOLE clearer of not_derived ──────────────────


def test_mark_derived_clears_not_derived(tmp_path, monkeypatch):
    """mark_derived(batch, active) flips the active version's not_derived → derived;
    idempotent on an already-derived version; mark_derived(non_active_version) raises
    ValueError (the pointer-race guard — a board build must never mark a NON-active
    version fresh, which would unblock dispatch against the wrong version)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = _materialize_v1_v2("p", "ep_001", "BATCH_001")  # v1 derived, v2 not_derived
    store.conform("BATCH_001", 2)  # active = v2; conform NEVER touches downstream
    m0 = load_manifest("p", "ep_001", "BATCH_001")
    assert m0["active_version"] == 2
    assert _downstream(m0, 2) == "not_derived"

    # clears the ACTIVE version's not_derived (the gate would now unblock)
    updated = store.mark_derived("BATCH_001", 2)
    assert _downstream(updated, 2) == "derived"
    assert _downstream(load_manifest("p", "ep_001", "BATCH_001"), 2) == "derived"  # persisted

    # idempotent on an already-derived version
    assert _downstream(store.mark_derived("BATCH_001", 2), 2) == "derived"

    # pointer-race guard: marking a NON-active version (v1) raises + writes nothing
    with pytest.raises(ValueError):
        store.mark_derived("BATCH_001", 1)
    final = load_manifest("p", "ep_001", "BATCH_001")
    assert _downstream(final, 1) == "derived"      # v1 untouched (was born derived)
    assert _downstream(final, 2) == "derived"
    assert final["active_version"] == 2            # pointer never moved


def test_mark_derived_flat_batch_is_noop(tmp_path, monkeypatch):
    """A flat batch has no manifest (no not_derived state to clear) → mark_derived is a
    no-op returning None; it NEVER materializes a manifest (no flag-day)."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]),
               scene_path("p", "ep_001", "BATCH_001"))
    store = SceneVersionStore("p", "ep_001")

    assert store.mark_derived("BATCH_001", 1) is None
    assert load_manifest("p", "ep_001", "BATCH_001") is None  # no manifest created


def test_case_a_rechecks_under_lock_when_flat_appears(tmp_path, monkeypatch):
    """If a concurrent initial writer creates the flat v1 while this writer waits for the
    per-batch lock, write_scene_candidate must recheck under that lock and append v2
    instead of overwriting the raced-in flat body."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    store = SceneVersionStore("p", "ep_001")
    flat_path = scene_path("p", "ep_001", "BATCH_001")
    raced_flat = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="RACED_FLAT")])
    candidate = Scene(scene_id="BATCH_001", beats=[Beat(beat_id="CANDIDATE")])
    original_locked_update = store._locked_manifest_update

    def _racing_locked_update(batch_id, fn):
        assert batch_id == "BATCH_001"
        assert not flat_path.exists()
        save_scene(raced_flat, flat_path)
        return original_locked_update(batch_id, fn)

    monkeypatch.setattr(store, "_locked_manifest_update", _racing_locked_update)

    assert store.write_scene_candidate("BATCH_001", candidate) == 2
    assert [beat.beat_id for beat in load_scene(flat_path).beats] == ["RACED_FLAT"]
    manifest = load_manifest("p", "ep_001", "BATCH_001")
    assert [entry["version"] for entry in manifest["versions"]] == [1, 2]
    assert manifest["versions"][0]["source"] == "legacy_flat"
    assert manifest["versions"][1]["state"] == "candidate"
    assert [
        beat.beat_id
        for beat in load_scene(scene_version_path("p", "ep_001", "BATCH_001", 2)).beats
    ] == ["CANDIDATE"]


def test_concurrent_append_unique_versions(tmp_path, monkeypatch):
    """Fire N concurrent write_scene_candidate calls at one batch through
    _locked_manifest_update: every call gets a DISTINCT version, the manifest has exactly
    N candidates + the materialized v1 (no lost update / duplicate version), and every
    .vNNN.json body exists. Proves the fcntl serialization is load-bearing."""
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(tmp_path))
    save_scene(Scene(scene_id="BATCH_001", beats=[Beat(beat_id="OPENING")]),
               scene_path("p", "ep_001", "BATCH_001"))
    store = SceneVersionStore("p", "ep_001")

    n_workers = 8
    start = threading.Barrier(n_workers, timeout=30)
    results: dict[int, int] = {}
    errors: list[BaseException] = []
    guard = threading.Lock()

    def _worker(i: int) -> None:
        try:
            start.wait()  # maximize overlap so the lock is genuinely contended
            n = store.write_scene_candidate(
                "BATCH_001",
                Scene(scene_id="BATCH_001", beats=[Beat(beat_id=f"V{i}")]),
            )
            with guard:
                results[i] = n
        except BaseException as exc:  # noqa: BLE001 — surface ANY worker failure
            with guard:
                errors.append(exc)

    threads = [threading.Thread(target=_worker, args=(i,)) for i in range(n_workers)]
    for t in threads:
        t.start()
    for t in threads:
        t.join(timeout=60)

    assert not errors, f"worker errors: {errors}"
    assert not any(t.is_alive() for t in threads), "a worker thread hung on the lock"

    versions = sorted(results.values())
    assert len(set(versions)) == n_workers                 # every call → DISTINCT version
    assert versions == list(range(2, n_workers + 2))       # v2..v(N+1); v1 is materialized

    manifest = load_manifest("p", "ep_001", "BATCH_001")
    assert len(manifest["versions"]) == n_workers + 1      # N candidates + materialized v1
    assert manifest["active_version"] == 1                 # pointer never moves on append
    for v in versions:
        assert scene_version_path("p", "ep_001", "BATCH_001", v).exists()
