"""SceneVersionStore — the single owner of every scene-version manifest mutation.

REC-231. A per-batch sidecar manifest ``ep_NNN_BATCH_NNN.versions.json`` records
``{schema_version, batch_id, active_version, versions[]}``; each version body is a
structure-immutable ``ep_NNN_BATCH_NNN.vNNN.json`` (or the flat ``ep_NNN_BATCH_NNN.json``
for a materialized-legacy v1). This class is the ONLY code that mutates a manifest —
``persistence.py`` holds only path/load/parse primitives plus the immutable version-body
``save_scene`` — and the ONLY code that moves ``active_version`` (via ``conform``/``revert``).

Phase 2 shipped ``write_scene_candidate`` (the append-only re-derivation writer). Phase 3
adds ``conform``/``revert`` (the sole pointer movers), wires ``structure_fingerprint``
into every manifest write (no entry left null), and centralizes the locked
read-modify-write in the shared ``_locked_manifest_update`` helper that all four
mutators share. ``mark_derived`` (Phase 5) lands on this same class and the same helper.
"""
from __future__ import annotations

import fcntl
import os
import re
from typing import Callable

from recoil.core.atomic_write import atomic_write_json
from recoil.pipeline._lib.schema_versions import SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION
from recoil.pipeline.core.persistence import (
    assert_batch_identity,
    load_manifest,
    load_scene,
    manifest_artifact_path,
    save_scene,
    scene_manifest_path,
    scene_path,
    scene_version_path,
    structure_fingerprint,
)
from recoil.pipeline.core.receipts import utc_now_iso8601
from recoil.pipeline.core.take import Scene

# REC-231 Phase 6: the recorded classification of a candidate version. Validated on
# write; auto pickup/reshoot classification is DEFERRED — the caller passes the value.
_VERSION_KINDS = ("pickup", "reshoot")


class SceneVersionStore:
    """Single-writer owner of a batch's versions manifest (REC-231)."""

    def __init__(self, project: str, episode: int | str) -> None:
        self.project = project
        self.episode = episode

    # ── Phase 2: append-only candidate writer ───────────────────────────────────
    def write_scene_candidate(
        self,
        requested_batch_id: str,
        scene: Scene,
        *,
        parent_version: int | None = None,
        source: str = "rederive",
        state: str = "candidate",
        kind: str = "reshoot",
        dedup_against_versions: bool = False,
    ) -> int:
        """Append ``scene`` as a new immutable candidate version; never move the pointer.

        ``requested_batch_id`` is the PRE-derivation selector target the caller asked to
        re-derive (NOT ``scene.scene_id`` — Phase 6 compares the two for the identity
        halt). In Phase 2 both are passed through as-is.

        Case A — no manifest AND no flat body (brand-new batch): there is nothing to
          re-derive yet, so the body is the batch's INITIAL flat ``ep_NNN_BATCH_NNN.json``
          (implicit v1 active, no manifest). Every flat-keyed discovery surface resolves
          it exactly as today. Returns ``1``.
        Case B — no manifest BUT a flat body exists (legacy/initial batch, first
          re-derive): materialize the manifest registering the existing flat file IN
          PLACE as v1 (``approved``/``derived``/``legacy_flat``, active) — no copy, no
          rename — then append the new structure as the next version.
        Case C — manifest exists (re-derive of an already-versioned batch): append the
          new structure as a candidate.

        The pointer (``active_version``) never moves on an append, and every newly
        appended entry records ``structure_fingerprint(scene)`` (Phase 3 — no entry left
        null). Returns the new version number N.

        Phase 6 HALTS before ANY write (no body, no manifest entry, no lock side effect)
        when the generated ``scene.scene_id`` disagrees with ``requested_batch_id`` (a
        renumber is a new identity, never a version), and validates the recorded
        ``kind ∈ {"pickup","reshoot"}`` (auto-classification is DEFERRED — the caller
        supplies it).

        When ``dedup_against_versions`` is true, the append path first scans the
        manifest's registered versions for the candidate's structure fingerprint and
        returns that existing version if found. The default is false so callers that
        intentionally create same-structure versions keep append-only behavior.
        """
        # Identity-halt + kind validation FIRST — before any path computation or write.
        assert_batch_identity(requested_batch_id, scene)
        if kind not in _VERSION_KINDS:
            raise ValueError(
                f"invalid kind {kind!r}: must be one of {_VERSION_KINDS} "
                "(the caller records pickup/reshoot — auto-classification is DEFERRED)"
            )
        batch_id = requested_batch_id
        flat_path = scene_path(self.project, self.episode, batch_id)

        # Cases A/B/C all run under the per-batch manifest lock. Case A still writes no
        # manifest, but it must recheck inside the lock so a concurrent first flat write
        # cannot be overwritten by another initial writer.
        def _append(manifest: dict | None) -> tuple[dict | None, int]:
            candidate_fp = structure_fingerprint(scene)
            if manifest is None:
                if not flat_path.exists():
                    save_scene(scene, flat_path)
                    return None, 1
                # Case B: materialize the legacy flat as v1 (in place, byte-identical).
                # Its recorded fp is provisional/minimal-body state; the first real
                # re-derive is allowed to append once and establish a rebuild fp.
                manifest = self._materialize_legacy_manifest(batch_id, flat_path)
            # Dedup uses the FULL structure_fingerprint (which INCLUDES shot.raw),
            # DELIBERATELY NOT canonical_shot_identity (which drops _BUILDER_VARIANT_SHOT_KEYS
            # incl. `raw`). This is the opposite projection from Writer 1's mutation guard,
            # ON PURPOSE — they answer different questions. Writer 1 asks "cross-BUILDER
            # representation variance?" (raw excluded — a minimally-persisted body vs a runner
            # rebuild legitimately differ in raw). Dedup asks "cross-RE-DERIVATION content
            # identity?" — and shot.raw carries `description` (the prompt source, episode_runner
            # ~:3092) and `source_text_hash` (script identity), so two re-derives differing only
            # in raw represent a REAL script/prompt change that MUST append a reviewable
            # candidate. Excluding raw here would dedup real script edits away (the canon layer's
            # whole purpose). Conservative-append on a pure-render-flag-only diff is acceptable
            # (an extra candidate is ignorable); deduping a real change is destructive.
            if dedup_against_versions:
                for entry in manifest.get("versions", []):
                    if entry.get("structure_fingerprint") == candidate_fp:
                        version = entry.get("version")
                        if isinstance(version, int):
                            return manifest, version

            n = self._next_version(manifest, batch_id)
            target = scene_version_path(self.project, self.episode, batch_id, n)
            # Write-once guard: a .vNNN.json body's structure is immutable; never let
            # save_scene's os.replace silently overwrite one.
            if target.exists():
                raise FileExistsError(
                    f"refusing to overwrite existing version body {target.name} for "
                    f"batch {batch_id} (a version body's structure is immutable)"
                )
            save_scene(scene, target)
            manifest["versions"].append(
                {
                    "version": n,
                    "artifact": target.name,
                    "state": state,
                    "downstream": "not_derived",  # a fresh candidate has no boards yet
                    "kind": kind,
                    "source": source,
                    "parent_version": parent_version,
                    "structure_fingerprint": candidate_fp,
                    "shot_count": len(scene.beats),
                    "created_at": utc_now_iso8601(),
                }
            )
            # Do NOT change active_version — conform/revert own the pointer move.
            return manifest, n

        return self._locked_manifest_update(batch_id, _append)

    # ── Phase 3: conform + revert — the SOLE movers of active_version ────────────
    def conform(self, batch_id: str, version: int) -> dict:
        """Point ``active_version`` at ``version`` + mark it approved (sole pointer mover).

        Requires the manifest to exist (a conform presupposes a registered candidate)
        and ``version`` to be a registered version — else ``ValueError``. Does NOT
        recompute ``downstream``: the target already carries its own (born
        ``not_derived`` for a candidate, ``derived`` for a boarded version), and the
        freshness gate reads THAT. Does NOT auto-map old boards onto new beats, and does
        NOT mutate any version body. Returns the updated manifest.
        """

        def _conform(manifest: dict | None) -> tuple[dict, dict]:
            if manifest is None:
                raise ValueError(
                    f"cannot conform batch {batch_id!r}: no versions manifest "
                    "(a conform presupposes a registered candidate)"
                )
            entry = self._require_version(manifest, batch_id, version)
            self._validate_pointer_target(batch_id, manifest, version)
            manifest["active_version"] = version
            entry["state"] = "approved"
            return manifest, manifest

        return self._locked_manifest_update(batch_id, _conform)

    def revert(self, batch_id: str, version: int) -> dict:
        """Point ``active_version`` back at a prior ``version`` (the explicit go-back verb).

        Lossless: ONLY ``active_version`` changes — no version body and no version's
        ``downstream`` is touched, so a previously-``derived`` version comes back
        immediately dispatch-ready. Requires the manifest to exist and ``version`` to be
        registered — else ``ValueError``. Returns the updated manifest.
        """

        def _revert(manifest: dict | None) -> tuple[dict, dict]:
            if manifest is None:
                raise ValueError(
                    f"cannot revert batch {batch_id!r}: no versions manifest"
                )
            self._require_version(manifest, batch_id, version)
            self._validate_pointer_target(batch_id, manifest, version)
            manifest["active_version"] = version
            return manifest, manifest

        return self._locked_manifest_update(batch_id, _revert)

    # ── Phase 5: mark_derived — the SOLE clearer of not_derived ──────────────────
    def mark_derived(self, batch_id: str, version: int) -> dict | None:
        """Flip a version's ``downstream`` to ``"derived"`` after a successful re-board.

        The SOLE clearer of ``not_derived`` (a candidate is born ``not_derived`` on
        append; this clears it once the active version has been re-boarded, so the
        downstream freshness gate unblocks). Under the manifest lock it ASSERTS
        ``version == active_version`` and raises ``ValueError`` otherwise — a
        delayed/racing board build must never mark a NON-active version fresh (that would
        unblock dispatch against the wrong version). Idempotent on an already-``derived``
        version. A flat batch has no manifest (hence no ``not_derived`` state to clear) →
        no-op returning ``None``; otherwise returns the updated manifest.
        """
        # A flat batch is never blocked by the freshness gate (no manifest, no
        # downstream state) — there is nothing to clear, so no-op rather than
        # materialize a manifest (no flag-day).
        if load_manifest(self.project, self.episode, batch_id) is None:
            return None

        def _mark(manifest: dict | None) -> tuple[dict, dict]:
            if manifest is None:  # raced away between the pre-check and the lock
                raise ValueError(
                    f"cannot mark_derived batch {batch_id!r}: no versions manifest"
                )
            active = manifest["active_version"]
            if version != active:
                raise ValueError(
                    f"refusing to mark_derived v{version} of batch {batch_id!r}: it is "
                    f"not the active version (v{active}) — a board build must not mark a "
                    "non-active version fresh (the pointer-race guard)"
                )
            entry = self._require_version(manifest, batch_id, version)
            entry["downstream"] = "derived"
            return manifest, manifest

        return self._locked_manifest_update(batch_id, _mark)

    # ── internal: the single locked read-modify-write shared by all mutators ─────
    def _locked_manifest_update(
        self, batch_id: str, fn: Callable[[dict | None], tuple[dict | None, object]]
    ) -> object:
        """Run a manifest read-modify-write under the per-batch advisory lock.

        Takes ``fcntl.flock(LOCK_EX)`` on the ``ep_NNN_BATCH_NNN.versions.lock`` sidecar
        (the same mechanism ``recoil/core/atomic_write.py`` uses for
        ``jsonl_append_locked``), RE-reads the manifest under the lock (so a concurrent
        mutator's committed write is observed — no lost-update / duplicate-version race),
        applies ``fn(manifest)`` — which returns ``(manifest_to_write, result)`` and may
        write a scene body as a side effect (the append writes its new ``.vNNN.json``,
        and Case A writes its initial flat body here) — writes ``manifest_to_write``
        atomically when present, and returns ``result``. ``fn`` receives ``None`` when no
        manifest exists yet (the append's Case A writes no manifest; Case B materializes
        one). This is the single locked path shared by ``write_scene_candidate``,
        ``conform``, ``revert`` (and ``mark_derived`` in Phase 5); no manifest mutation
        bypasses it.
        """
        manifest_path = scene_manifest_path(self.project, self.episode, batch_id)
        lock_path = manifest_path.with_suffix(".lock")
        lock_path.parent.mkdir(parents=True, exist_ok=True)
        lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR)
        try:
            fcntl.flock(lock_fd, fcntl.LOCK_EX)
            manifest = load_manifest(self.project, self.episode, batch_id)
            new_manifest, result = fn(manifest)
            if new_manifest is not None:
                atomic_write_json(manifest_path, new_manifest)
            return result
        finally:
            fcntl.flock(lock_fd, fcntl.LOCK_UN)
            os.close(lock_fd)

    @staticmethod
    def _require_version(manifest: dict, batch_id: str, version: int) -> dict:
        """Return the manifest entry for ``version``, else raise ``ValueError``."""
        for entry in manifest.get("versions", []):
            if entry.get("version") == version:
                return entry
        raise ValueError(
            f"version {version} is not registered in the manifest for batch "
            f"{batch_id!r}"
        )

    def _validate_pointer_target(self, batch_id: str, manifest: dict, version: int) -> None:
        """Fail closed unless ``version`` resolves to an existing body for ``batch_id``."""
        target = manifest_artifact_path(
            self.project, self.episode, batch_id, manifest, version
        )
        if not target.exists():
            raise FileNotFoundError(f"Scene JSON not found: {target}")
        scene = load_scene(target)
        if scene.scene_id != batch_id:
            raise ValueError(
                f"cannot move pointer for batch {batch_id!r} to v{version}: "
                f"body scene_id {scene.scene_id!r} does not match"
            )

    def _materialize_legacy_manifest(self, batch_id: str, flat_path) -> dict:
        """Register the existing flat body as v1 (approved/derived/legacy_flat, active).

        No copy, no rename — v1's recorded ``artifact`` is the flat basename, so the
        loader resolves the pre-versioning flat file byte-identically as v1. Its
        ``structure_fingerprint`` is computed from the loaded flat scene (Phase 3 — no
        entry left null). Returned in-memory; the caller appends the candidate and writes
        the manifest once.
        """
        flat_scene = load_scene(flat_path)
        return {
            "schema_version": SCENE_VERSIONS_MANIFEST_SCHEMA_VERSION,
            "batch_id": batch_id,
            "active_version": 1,
            "versions": [
                {
                    "version": 1,
                    "artifact": flat_path.name,
                    "state": "approved",
                    "downstream": "derived",  # its boards already exist in the flat body
                    "kind": "reshoot",
                    "source": "legacy_flat",
                    "parent_version": None,
                    "structure_fingerprint": structure_fingerprint(flat_scene),
                    "shot_count": len(flat_scene.beats),
                    "created_at": utc_now_iso8601(),
                }
            ],
        }

    def _next_version(self, manifest: dict, batch_id: str) -> int:
        """N = max(highest manifest version, highest .vNNN body on disk) + 1.

        Taking the max over BOTH the manifest AND the filesystem makes the append
        crash-safe: an orphan body from a prior append that died before its manifest
        write never collides (N strictly advances past it). The orphan is a harmless
        unreferenced body — a deferred-prune target.
        """
        manifest_max = max(
            (int(v["version"]) for v in manifest.get("versions", [])),
            default=0,
        )
        disk_max = self._highest_disk_version(batch_id)
        return max(manifest_max, disk_max) + 1

    def _highest_disk_version(self, batch_id: str) -> int:
        sample = scene_version_path(self.project, self.episode, batch_id, 1)
        scenes_dir = sample.parent
        if not scenes_dir.is_dir():
            return 0
        # sample.name == "ep_NNN_BATCH_NNN.v001.json"; match this batch's `.vNNN.json` bodies.
        stem = sample.name[: sample.name.rindex(".v")]
        pat = re.compile(re.escape(stem) + r"\.v(\d+)\.json$")
        best = 0
        for entry in scenes_dir.iterdir():
            match = pat.match(entry.name)
            if match:
                best = max(best, int(match.group(1)))
        return best
