"""Per-atom regeneration — the write-side hand-assembly surface (REC-235 Phase 3).

regenerate_atom: append a NEW Take to a beat (append-only, never overwrite) via an
INJECTED generation callable, returning the new atom-version URN. set_canon/approve:
move the canon pointer (Beat.primary_take_id) to an approved take. Both persist through
the REC-231 active-version in-place writer (takes + primary_take_id are non-structural,
so the structure guard passes).

The generation step is INJECTED (a callable). This module NEVER calls VideoRunner or
dispatch — see the `generate` seam doc on regenerate_atom for the live wiring point
(the individual video_i2v VideoRunner path; identity rides composite character sheets,
settled 2026-06-17, so per-shot generation keeps identity without shot-adjacency).
"""

from __future__ import annotations

from typing import Callable, Optional

from recoil.core.paths import ProjectPaths
from recoil.pipeline._lib import ops_log
from recoil.pipeline.core.persistence import (
    SceneVersionConflictError,
    load_scene_active_with_version,
    save_active_scene_status,
)
from recoil.pipeline.core.take import (
    Beat,
    Scene,
    Take,
    atom_version_urn,
    parse_atom_urn,
    parse_atom_version_urn,
)
from recoil.pipeline.core.workflow import Workflow

# The injected generation seam. Given the live Beat ONLY, build the Workflow for
# THIS atom's regeneration and return it. The generator is INDEX-AGNOSTIC — it does
# NOT receive (and MUST NOT key any artifact path, payload, or provenance off) a
# guessed take_index. regenerate_atom wraps the returned Workflow via Beat.new_take
# INSIDE the per-batch lock, so the beat OWNS take_index/take_id assignment against
# the lock-fresh body — id assignment + the duplicate-id guard stay inside Beat.
GenerateWorkflow = Callable[[Beat], Workflow]


def _locate_beat(scene: Scene, beat_id: str) -> Beat:
    """Return the beat with ``beat_id`` from the lock-fresh ``scene`` (fail-loud).

    Both persist closures re-locate the beat by id against the freshly-loaded body
    UNDER the lock; the locator + active-version recheck already guarantee the beat is
    present, so a miss is a data-integrity violation, not a control-flow branch.
    """
    for beat in scene.beats:
        if beat.beat_id == beat_id:
            return beat
    raise ValueError(f"beat {beat_id!r} not found in scene {scene.scene_id!r}")


def regenerate_atom(
    project: str,
    beat_urn: str,
    generate: GenerateWorkflow,
    *,
    take_metadata: Optional[dict] = None,
) -> str:
    """Append a NEW Take to ``beat_urn`` via the INJECTED ``generate`` seam; return its
    atom-version URN. APPEND-ONLY — never mutates/overwrites a prior take, never moves
    the canon pointer (approval is the separate ``set_canon`` call).

    The ``generate`` seam (execution contract (a)): ``generate(beat) -> Workflow``
    returns a Workflow whose terminal video_i2v step ALREADY carries its receipt
    (``receipt.run_result.output_path`` is populated) — generation has ALREADY happened
    INSIDE ``generate`` by the time it returns. ``regenerate_atom`` itself NEVER executes
    a workflow: it does not invoke Take.execute, Workflow.run, dispatch, or VideoRunner —
    it ONLY appends a Take carrying that already-receipted Workflow and persists it. The
    seam is the SINGLE place real generation can ever happen, which is why the build +
    every test are zero-spend / zero-network (the seam is always injected, here a fake).

    LIVE WIRING (documented, NOT called here): in production ``generate(beat)`` builds the
    video_i2v Workflow whose payload matches VideoRunner (shot_id, prompt, model,
    start_frame, reference_images = the composite character sheets that carry identity,
    aspect_ratio, ...) — INDEX-AGNOSTIC (no guessed take_index baked into any path or
    payload) — runs execution INTERNALLY (Take.execute -> Workflow.run -> dispatch of the
    video_i2v modality -> VideoRunner) and returns the workflow only after that step's
    receipt output_path is populated. The production caller is REC-241; this module never
    reaches it.

    ATOMIC INDEX: ``Beat.new_take`` assigns ``take_index = len(takes)`` INSIDE the
    per-batch lock against the lock-fresh re-loaded beat (the single index-assignment
    site); the injected generator is index-agnostic, so the real index can never drift
    from the appended take. The returned URN is stamped from that real index, read back
    after the locked write.

    Raises ValueError on: a non-addressable episode token; a non-existent / episode-
    non-unique beat; or a ``generate`` result lacking a receipted, root-safe, on-disk
    terminal artifact — in which case the beat is left UNCHANGED (validation precedes any
    append).
    """
    episode, beat_id = parse_atom_urn(beat_urn)

    # Single-owner locator + the read model's artifact-safety contract (readmodel is the
    # canonical home; lazy import breaks the workspace->pipeline.core cycle, mirroring
    # board_builder's resolve_atom_version import).
    from recoil.workspace.readmodel import (
        _persistence_episode,
        _safe_artifact_relpath,
        _terminal_artifact,
        _unique_active_beat,
    )

    episode_int = _persistence_episode(episode)
    if episode_int is None:
        raise ValueError(
            f"episode token {episode!r} is not addressable to a persisted episode "
            "(expected EP<NNN>)"
        )

    scene, beat = _unique_active_beat(project, episode_int, beat_id)
    if beat is None:
        raise ValueError(f"no active beat {beat_id!r} in episode {episode}")
    # load_scene_active_with_version already asserted scene.scene_id == batch_id, so
    # scene.scene_id IS the write key; re-read atomically for the locked write's version.
    batch_id = scene.scene_id
    _scene, loaded_version = load_scene_active_with_version(project, episode_int, batch_id)

    # The INDEX-AGNOSTIC seam. The generator MUST NOT mutate the beat (the real append
    # happens on a lock-fresh re-load below) and MUST NOT key anything off a guessed index.
    workflow = generate(beat)

    # Validate the returned workflow exposes a RESOLVABLE, SAFE, ON-DISK terminal artifact
    # — the EXACT one the read model exposes — BEFORE appending; fail loud, beat UNCHANGED.
    # Use readmodel._terminal_artifact rather than mirroring its step walk here, so the
    # write-side guard and read-side resolver cannot drift.
    candidate_take = Take(
        take_id=f"{beat_id}_regen_candidate",
        take_index=0,
        workflow=workflow,
    )
    safe_rel = _terminal_artifact(project, candidate_take)
    if safe_rel is None:
        raise ValueError(
            "generate(beat) returned a workflow with no receipted terminal artifact "
            "or an artifact outside the project root (execution-contract-(a) violation)"
        )
    # Safety contract is the read model's single owner (_safe_artifact_relpath): an
    # out-of-root / '..'-escaping path is what the resolver refuses; a missing file is
    # what the cut render would BoardBuilderError on. Reject both at append time.
    project_root = ProjectPaths.for_project(project).project_root
    safe_rel = _safe_artifact_relpath(project_root, safe_rel)
    if safe_rel is None:
        raise ValueError(
            "generated artifact escapes the project root "
            "(the read-model resolver would refuse it)"
        )
    if not (project_root / safe_rel).exists():
        raise ValueError(
            f"generated artifact {safe_rel!r} is missing on disk "
            "(the cut render would fail to resolve it)"
        )

    # Persist append-only under the per-batch lock. new_take assigns the REAL take_index
    # HERE, atomically against the lock-fresh body; stash it so the URN is stamped from
    # the real index, never a pre-guessed one.
    episode_token = f"ep_{episode_int:03d}"
    assigned_index: list[int] = []

    def _append(fresh_scene: Scene) -> None:
        target = _locate_beat(fresh_scene, beat_id)
        new_take = target.new_take(workflow=workflow, take_metadata=take_metadata)
        assigned_index.append(new_take.take_index)

    # The artifact `generate()` already produced + receipted is SPENT before this
    # locked append. If the active version moved between our load and this save
    # (a concurrent conform/revert, or a second stale regenerate winning the lock
    # first), save_active_scene_status raises SceneVersionConflictError and writes
    # NOTHING — leaving that spent artifact ORPHANED, outside append-only scene
    # history. Contract (c): RECORD the orphan through the established ops-log seam
    # (ops_log.write) BEFORE re-raising, so a spent artifact is never silently lost.
    # Fail loud, fail closed — never swallow the conflict.
    try:
        save_active_scene_status(
            project,
            episode_token,
            batch_id,
            expected_version=loaded_version,
            mutate=_append,
        )
    except SceneVersionConflictError as conflict:
        ops_log.write(
            {
                "event": "regen_orphan_artifact",
                "project": project,
                "episode": episode_token,
                "batch_id": batch_id,
                "beat_id": beat_id,
                "artifact": str(safe_rel),
                "expected_version": conflict.expected_version,
                "actual_version": conflict.actual_version,
                "cause": "scene_version_conflict",
            }
        )
        raise

    return atom_version_urn(episode, beat_id, assigned_index[0])


def set_canon(project: str, beat_urn: str, atom_version_urn: str) -> None:
    """Move the canon pointer (``Beat.primary_take_id``) to the approved atom-version.

    APPROVAL ONLY — looks up the take whose ``take_index`` matches the atom-version URN
    and points ``primary_take_id`` at its id (prior takes + bodies untouched).
    ``primary_take_id`` is non-structural, so the REC-231 structure guard passes.
    """
    episode, beat_id = parse_atom_urn(beat_urn)
    episode2, beat_id2, take_index = parse_atom_version_urn(atom_version_urn)
    if (episode, beat_id) != (episode2, beat_id2):
        raise ValueError(
            f"atom-version URN {atom_version_urn!r} does not belong to beat {beat_urn!r} "
            f"(got episode/beat {episode2}/{beat_id2})"
        )

    # Same single-owner locator as regenerate_atom (lazy — cycle-safe).
    from recoil.workspace.readmodel import _persistence_episode, _unique_active_beat

    episode_int = _persistence_episode(episode)
    if episode_int is None:
        raise ValueError(
            f"episode token {episode!r} is not addressable to a persisted episode "
            "(expected EP<NNN>)"
        )

    scene, beat = _unique_active_beat(project, episode_int, beat_id)
    if beat is None:
        raise ValueError(f"no active beat {beat_id!r} in episode {episode}")
    batch_id = scene.scene_id
    _scene, loaded_version = load_scene_active_with_version(project, episode_int, batch_id)

    def _approve(fresh_scene: Scene) -> None:
        target_beat = _locate_beat(fresh_scene, beat_id)
        target = next(
            (t for t in target_beat.takes if t.take_index == take_index), None
        )
        if target is None:
            raise ValueError(
                f"no atom-version @t{take_index} on beat {beat_id!r} "
                "(cannot point canon at a non-existent take)"
            )
        # LOOK UP the id — externally-constructed beats may have non-new_take ids.
        target_beat.primary_take_id = target.take_id

    episode_token = f"ep_{episode_int:03d}"
    save_active_scene_status(
        project,
        episode_token,
        batch_id,
        expected_version=loaded_version,
        mutate=_approve,
    )


# `approve` is the editorial-facing alias for the canon-pointer move.
approve = set_canon
