"""Take model — editorial wrapper around Workflow.

CP-7 deliverable. Wraps the CP-6 Workflow surface in a typed editorial
data model. A Take is one attempt at a logical shot — it wraps exactly
one Workflow. A Beat groups multiple Takes; the primary_take_id pointer
selects the best attempt. A Scene is a thin grouping of Beats.

Schema is locked at CP-7. CP-8 adds audio Takes (retry-as-new-take
pattern). CP-9 fills `take.aggregate_score` and replaces the
"first_success" primary-selection default with score-based selection.

Public surface (frozen at CP-7):
    Take        — one generation attempt
    Beat        — a group of Takes for one logical shot/beat
    Scene       — a thin grouping of Beats
    TakeStatus  — Literal["pending","running","succeeded","failed","partial"]
    Take.execute     — wraps Workflow.run (Phase 3)
    Take.to_dict / from_dict
    Beat.new_take / add_take / select_primary  (Phase 4) / primary_take property
    Beat.to_dict / from_dict
    Scene.add_beat
    Scene.to_dict / from_dict

JSON round-trip: Take.from_dict(t.to_dict()) == t (when no execution
state attached) and step-for-step equal (when executed — receipts
round-trip via the underlying Workflow). Same property holds for
Beat and Scene.

CP-7 is in-memory only — no disk persistence (mirrors CP-6 precedent).
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from pathlib import PurePath
from typing import Any, ClassVar, Literal, Optional

from recoil.pipeline.core.receipts import utc_now_iso8601
from recoil.pipeline.core.workflow import HookFn, Workflow


TakeStatus = Literal["pending", "running", "succeeded", "failed", "partial"]
BeatStatus = Literal[
    "approved", "not_started", "generating", "review", "exhausted", "pending"
]
SceneStatus = Literal[
    "final", "review", "generating", "not_started", "in_progress"
]


# ──────────────────────────────────────────────────────────────────────
# REC-235 Phase 0: atom-version identity URNs — pure builder/parser
# helpers (no I/O). The atom (identity) is the Beat; the atom-version is a
# Take, addressed atom:{episode}/beat/{beat_id}@t{take_index}. batch_id is
# provenance on the Take, never the atom's identity.
# ──────────────────────────────────────────────────────────────────────

# Shared token grammar — episode + beat_id are alnum/underscore (URN-safe). The BUILDER
# (atom_urn) and BOTH parsers validate against the SAME grammar, so a URN that is built
# always re-parses (fixes the builder/parser disagreement: atom_urn previously only rejected
# '/','@',':' while the parsers required [A-Za-z0-9_]+).
_URN_TOKEN_RE = re.compile(r"^[A-Za-z0-9_]+$")
_ATOM_URN_RE = re.compile(r"^atom:(?P<episode>[A-Za-z0-9_]+)/beat/(?P<beat_id>[A-Za-z0-9_]+)$")
_ATOM_VERSION_URN_RE = re.compile(
    r"^atom:(?P<episode>[A-Za-z0-9_]+)/beat/(?P<beat_id>[A-Za-z0-9_]+)@t(?P<take_index>\d+)$"
)


def _validate_urn_token(label: str, val: str) -> None:
    if not isinstance(val, str) or not _URN_TOKEN_RE.match(val):
        raise ValueError(f"atom URN {label} must match [A-Za-z0-9_]+ (URN-safe), got {val!r}")


def atom_urn(episode: str, beat_id: str) -> str:
    """Beat-grain atom identity URN: ``atom:{episode}/beat/{beat_id}``.

    ``episode`` is a canonical episode token (e.g. ``EP001``); ``beat_id`` is the Beat's stable
    identity (e.g. ``EP001_SH02``). BOTH tokens are validated against the SAME [A-Za-z0-9_]+
    grammar the parsers enforce, so a built URN always round-trips through ``parse_atom_urn``.
    """
    _validate_urn_token("episode", episode)
    _validate_urn_token("beat_id", beat_id)
    return f"atom:{episode}/beat/{beat_id}"


def atom_version_urn(episode: str, beat_id: str, take_index: int) -> str:
    """Atom-version URN: ``atom:{episode}/beat/{beat_id}@t{take_index}`` (the @tN candidate)."""
    if not isinstance(take_index, int) or take_index < 0:
        raise ValueError(f"take_index must be a non-negative int, got {take_index!r}")
    return f"{atom_urn(episode, beat_id)}@t{take_index}"


def parse_atom_urn(urn: str) -> tuple[str, str]:
    """Parse a BEAT-grain atom URN (NO @tN) into ``(episode, beat_id)``. Fail-loud on malformed
    input. This is the SINGLE canonical beat-URN parser — readmodel.get_atom MUST use this, not a
    re-implemented inverse."""
    if not isinstance(urn, str):
        raise TypeError(f"atom URN must be a string, got {type(urn).__name__}")
    m = _ATOM_URN_RE.match(urn)
    if not m:
        raise ValueError(f"malformed atom URN {urn!r} — expected 'atom:{{episode}}/beat/{{beat_id}}'")
    return m.group("episode"), m.group("beat_id")


def parse_atom_version_urn(urn: str) -> tuple[str, str, int]:
    """Parse an atom-VERSION URN into ``(episode, beat_id, take_index)``. Fail-loud on malformed input."""
    if not isinstance(urn, str):
        raise TypeError(f"atom-version URN must be a string, got {type(urn).__name__}")
    m = _ATOM_VERSION_URN_RE.match(urn)
    if not m:
        raise ValueError(
            f"malformed atom-version URN {urn!r} — expected 'atom:{{episode}}/beat/{{beat_id}}@t{{N}}'"
        )
    return m.group("episode"), m.group("beat_id"), int(m.group("take_index"))


@dataclass
class Take:
    """One generation attempt at a single beat. Wraps exactly one Workflow.

    Fields: take_id (unique within parent Beat), take_index (0-based,
    assigned by Beat.new_take), workflow (locked at construction),
    status (pending → running → succeeded|failed|partial), created_at
    (ISO 8601 UTC), take_metadata (free-form dict; recommended keys:
    "model", "operator", "notes", "duration_ms").

    Status compression: succeeded = every step succeeded; failed = no
    step succeeded; partial = at least one succeeded AND at least one
    failed/skipped. Editorial re-attempt idiom is Beat.new_take(...),
    not take.execute() twice (both supported, former preferred).
    """

    take_id: str
    take_index: int
    workflow: Workflow
    status: TakeStatus = "pending"
    created_at: str = field(default_factory=utc_now_iso8601)
    take_metadata: dict[str, Any] = field(default_factory=dict)
    parent_take_id: Optional[str] = None
    aggregate_score: Optional[float] = None
    provenance: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        if not isinstance(self.take_id, str) or not self.take_id:
            raise ValueError(
                f"Take.take_id must be a non-empty string, got {self.take_id!r}"
            )
        if not isinstance(self.take_index, int) or self.take_index < 0:
            raise ValueError(
                f"Take.take_index must be a non-negative int, got {self.take_index!r}"
            )
        if not isinstance(self.workflow, Workflow):
            raise TypeError(
                f"Take.workflow must be a Workflow, got {type(self.workflow).__name__}"
            )
        if not isinstance(self.take_metadata, dict):
            raise TypeError(
                f"Take.take_metadata must be a dict, got {type(self.take_metadata).__name__}"
            )
        if not isinstance(self.provenance, dict):
            raise TypeError(
                f"Take.provenance must be a dict, got {type(self.provenance).__name__}"
            )
        if self.status not in ("pending", "running", "succeeded", "failed", "partial"):
            raise ValueError(
                f"Take.status must be one of "
                f"['pending', 'running', 'succeeded', 'failed', 'partial'], "
                f"got {self.status!r}"
            )

    def to_dict(self) -> dict[str, Any]:
        """Serialize for JSON. Workflow round-trips via Workflow.to_dict."""
        return {
            "take_id": self.take_id,
            "take_index": self.take_index,
            "workflow": self.workflow.to_dict(),
            "status": self.status,
            "created_at": self.created_at,
            "take_metadata": dict(self.take_metadata),
            "parent_take_id": self.parent_take_id,
            "aggregate_score": self.aggregate_score,
            "provenance": dict(self.provenance),
        }

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "Take":
        return cls(
            take_id=d["take_id"],
            take_index=int(d["take_index"]),
            workflow=Workflow.from_dict(d["workflow"]),
            status=d.get("status", "pending"),
            created_at=d.get("created_at") or utc_now_iso8601(),
            take_metadata=dict(d.get("take_metadata") or {}),
            parent_take_id=d.get("parent_take_id"),
            aggregate_score=d.get("aggregate_score"),
            provenance=dict(d.get("provenance") or {}),
        )

    def compute_aggregate_score(self) -> Optional[float]:
        """Compute and store aggregate_score from step.receipt.eval_scores.

        Aggregation formula (CP-9 default):
            mean of `panel_score` floats across all PanelOfJudges scorecards
            in `step.receipt.eval_scores` across every step in
            `self.workflow.steps`.

        Algorithm:
          1. Walk every step in self.workflow.steps.
          2. For each step with a non-None receipt: iterate
             receipt.eval_scores (a dict keyed by panel_id, values are
             scorecard dicts). For each scorecard that exposes a non-None
             numeric `panel_score`, collect the float.
          3. Skip non-dict scorecard values, skip None panel_scores, skip
             non-numeric panel_scores (TypeError / ValueError swallowed).
          4. If the collected list is empty (no panel ever scored), set
             self.aggregate_score = None and return None.
          5. Otherwise self.aggregate_score = mean(scores) and return it.

        Multi-panel weighting is intentionally simple in CP-9 — weighted
        aggregation is a CP-N+ concern. Per JT default, flagged for
        spec-review.

        Returns:
            The computed aggregate_score (Optional[float]). Also stores it
            on self as a side effect.
        """
        import math
        import statistics

        scores: list[float] = []
        for step in self.workflow.steps:
            r = getattr(step, "receipt", None)
            if r is None:
                continue
            for _panel_id, scorecard in (r.eval_scores or {}).items():
                if not isinstance(scorecard, dict):
                    continue
                ps = scorecard.get("panel_score")
                if ps is None:
                    continue
                try:
                    v = float(ps)
                except (TypeError, ValueError):
                    continue
                # NaN / Inf must NOT pollute the mean — sort orderings
                # become non-deterministic and the average is meaningless.
                if math.isnan(v) or math.isinf(v):
                    continue
                scores.append(v)
        if not scores:
            self.aggregate_score = None
            return None
        # statistics.fmean for byte-equivalent behavior with eval.py's
        # PanelOfJudges aggregation (different layer, same idiom).
        agg = float(statistics.fmean(scores))
        self.aggregate_score = agg
        return agg


@dataclass
class Beat:
    """One logical shot/beat. Groups Takes; primary_take_id selects best attempt.

    Fields: beat_id (recommended `EP{NNN}_SH{NN}`), takes (ordered list,
    take_index 0-based assigned by new_take), primary_take_id (optional;
    Phase 4 select_primary sets via "first_success" default), beat_metadata
    (free-form; recommended keys: scene_id, shot_id, notes), created_at.
    board is the storyboard sub-state:
    {"status": "proposed"|"approved"|"rejected", "artifact": str,
    "source_sha256": str, "approved_by": Optional[str], "updated_at": str}.
    board["artifact"] is project-relative to the project data root.
    """

    beat_id: str
    takes: list[Take] = field(default_factory=list)
    primary_take_id: Optional[str] = None
    beat_metadata: dict[str, Any] = field(default_factory=dict)
    created_at: str = field(default_factory=utc_now_iso8601)
    max_takes: int = 3
    approved: bool = False
    phantom_recovery_count: int = 0
    board: Optional[dict] = None

    # Hard cap on phantom recoveries — prevents unbounded retries when an
    # artifact path is permanently broken (REC-19 option C).
    _MAX_PHANTOM_RECOVERIES: ClassVar[int] = 2

    @property
    def is_exhausted(self) -> bool:
        """True when no more Takes will be attempted on this Beat.

        Exhausted = takes-limit reached AND no primary chosen.
        Each phantom recovery (up to _MAX_PHANTOM_RECOVERIES) extends the
        effective take limit by one slot (REC-19).
        """
        if self.primary_take is not None:
            return False
        effective_max = self.max_takes + min(
            self.phantom_recovery_count, self._MAX_PHANTOM_RECOVERIES
        )
        return len(self.takes) >= effective_max

    def grant_phantom_recovery(self) -> bool:
        """Increment phantom_recovery_count if below cap. Returns True if granted."""
        if self.phantom_recovery_count >= self._MAX_PHANTOM_RECOVERIES:
            return False
        self.phantom_recovery_count += 1
        return True

    def set_board_proposed(
        self,
        artifact: str,
        source_sha256: str,
        fingerprint_version: int = 1,
        composition_ref: Optional[dict] = None,
    ) -> None:
        """Set the storyboard sub-state to proposed.

        ``composition_ref`` (REC-240) is the OPTIONAL one-level-of-indirection seam: a
        structured reference to a ``CompositionManifest`` (``kind``/``members``/``layout``)
        plus its rendered ``view`` path, carried ALONGSIDE — never replacing — the flat
        ``artifact`` PNG. When absent the board is a legacy flat board and resolves exactly
        as before (back-compat is mandatory — this is the live spend path). When present the
        board is a derived projection of the composition; the resolver indirects through it
        (pointer → manifest → view). ``atom_refs`` is a reserved passthrough extension point.
        """
        if not isinstance(artifact, str) or not artifact:
            raise ValueError("board artifact must be a non-empty project-relative path")
        self._validate_board_artifact_path(artifact, "board artifact")
        if not isinstance(source_sha256, str) or not source_sha256:
            raise ValueError("board source_sha256 must be a non-empty string")
        if type(fingerprint_version) is not int or fingerprint_version not in (1, 2):
            raise ValueError(
                f"board fingerprint_version must be 1 or 2, got {fingerprint_version!r}"
            )
        self.board = {
            "status": "proposed",
            "artifact": artifact,
            "source_sha256": source_sha256,
            "fingerprint_version": fingerprint_version,
            "approved_by": None,
            "updated_at": utc_now_iso8601(),
        }
        if composition_ref is not None:
            self._validate_composition_ref(composition_ref)
            self.board["composition_ref"] = composition_ref

    def approve_board(self, approved_by: str) -> None:
        """Approve a proposed storyboard."""
        self._require_proposed_board()
        self.board["status"] = "approved"
        self.board["approved_by"] = approved_by
        self.board["updated_at"] = utc_now_iso8601()

    def set_board_story_gate(self, summary: dict) -> None:
        """
        gating projection only — the full verdict lives in the .verdict.json sidecar (SSOT).
        """
        if self.board is None:
            raise ValueError("board must exist")
        if not isinstance(summary, dict):
            raise TypeError("story gate summary must be a dict")
        required = {
            "mode",
            "route",
            "verdict_path",
            "verdict_hash",
            "severity",
            "confidence",
        }
        missing = required - set(summary)
        if missing:
            raise ValueError(
                f"story gate summary missing required keys: {sorted(missing)}"
            )

        # story_gate.ROUTES is the SSOT; core must not import _lib.
        routes = (
            "ok",
            "board_problem",
            "script_problem",
            "prompt_problem",
            "mixed",
            "judge_unavailable",
        )
        if summary["route"] not in routes:
            raise ValueError(
                "story gate route must be one of "
                f"{list(routes)}, got {summary['route']!r}"
            )
        for key in ("mode", "verdict_path", "verdict_hash"):
            if not isinstance(summary[key], str) or not summary[key]:
                raise ValueError(
                    f"story gate summary {key} must be a non-empty string"
                )

        self.board["story_gate"] = {
            **summary,
            "updated_at": utc_now_iso8601(),
        }

    def reject_board(self, approved_by: str) -> None:
        """Reject a proposed storyboard."""
        self._require_proposed_board()
        self.board["status"] = "rejected"
        self.board["approved_by"] = approved_by
        self.board["updated_at"] = utc_now_iso8601()

    def board_is_approved(self) -> bool:
        return self.board is not None and self.board["status"] == "approved"

    def _require_proposed_board(self) -> None:
        if self.board is None or self.board.get("status") != "proposed":
            raise ValueError("board must exist with status 'proposed'")

    @property
    def status(self) -> BeatStatus:
        """Derived workflow status. SSOT — never stored.

        Active takes win over `approved` so a beat that's still generating
        never appears finalized (race-condition guard for manual approvals
        landing while a take is in flight).
        """
        if any(t.status == "running" for t in self.takes):
            return "generating"
        if self.approved:
            return "approved"
        if not self.takes:
            return "not_started"
        if self.primary_take is not None:
            return "review"
        if self.is_exhausted:
            return "exhausted"
        return "pending"

    def __post_init__(self) -> None:
        if not isinstance(self.beat_id, str) or not self.beat_id:
            raise ValueError(
                f"Beat.beat_id must be a non-empty string, got {self.beat_id!r}"
            )
        if not isinstance(self.takes, list):
            raise TypeError(
                f"Beat.takes must be a list, got {type(self.takes).__name__}"
            )
        for i, t in enumerate(self.takes):
            if not isinstance(t, Take):
                raise TypeError(
                    f"Beat.takes[{i}] must be a Take, got {type(t).__name__}"
                )
        # Duplicate take_id detection — caught at construction time.
        seen = set()
        for t in self.takes:
            if t.take_id in seen:
                raise ValueError(
                    f"Duplicate take_id {t.take_id!r} in beat {self.beat_id!r}"
                )
            seen.add(t.take_id)
        # primary_take_id, if set, must reference an existing take.
        if self.primary_take_id is not None:
            if self.primary_take_id not in seen:
                raise ValueError(
                    f"Beat.primary_take_id {self.primary_take_id!r} does not match "
                    f"any take in beat {self.beat_id!r}. Takes: {sorted(seen)}"
                )
        if self.board is not None:
            self._validate_board(self.board)

    @staticmethod
    def _validate_board(board: dict) -> None:
        if not isinstance(board, dict):
            raise TypeError(
                f"Beat.board must be a dict or None, got {type(board).__name__}"
            )
        required = {"status", "artifact", "source_sha256", "approved_by", "updated_at"}
        missing = required - set(board)
        if missing:
            raise ValueError(f"Beat.board missing required keys: {sorted(missing)}")
        if board["status"] not in ("proposed", "approved", "rejected"):
            raise ValueError(
                "Beat.board status must be one of "
                "['proposed', 'approved', 'rejected']"
            )
        if not isinstance(board["artifact"], str) or not board["artifact"]:
            raise ValueError("Beat.board artifact must be a non-empty string")
        Beat._validate_board_artifact_path(board["artifact"], "Beat.board artifact")
        if not isinstance(board["source_sha256"], str) or not board["source_sha256"]:
            raise ValueError("Beat.board source_sha256 must be a non-empty string")
        if "fingerprint_version" in board:
            fingerprint_version = board["fingerprint_version"]
            if type(fingerprint_version) is not int or fingerprint_version not in (1, 2):
                raise ValueError(
                    "Beat.board fingerprint_version must be 1 or 2, "
                    f"got {fingerprint_version!r}"
                )
        approved_by = board["approved_by"]
        if approved_by is not None and not isinstance(approved_by, str):
            raise TypeError("Beat.board approved_by must be a string or None")
        if not isinstance(board["updated_at"], str) or not board["updated_at"]:
            raise ValueError("Beat.board updated_at must be a non-empty string")
        if "composition_ref" in board:
            Beat._validate_composition_ref(board["composition_ref"])

    @staticmethod
    def _validate_composition_ref(composition_ref: dict) -> None:
        """Validate the REC-240 composition_ref seam (additive; legacy boards omit it).

        Shape: ``{kind, members, layout, view, [atom_refs]}``. ``kind`` is a
        ``CompositionManifest`` kind; ``view`` is the rendered-view PNG path (the
        manifest's resolved artifact — for v1 the trivially-wrapped board PNG) and is
        project-relative-validated exactly like ``artifact``. core does not import the API
        schema (kept dependency-light like ``set_board_story_gate``'s inline route list);
        full manifest validation happens where the manifest is CONSTRUCTED/CONSUMED."""
        if not isinstance(composition_ref, dict):
            raise TypeError("Beat.board composition_ref must be a dict")
        kinds = ("CONT", "ONER", "COVERAGE", "BROLL", "PICKUP", "CUT")
        if composition_ref.get("kind") not in kinds:
            raise ValueError(
                f"Beat.board composition_ref kind must be one of {list(kinds)}, "
                f"got {composition_ref.get('kind')!r}"
            )
        members = composition_ref.get("members")
        if not isinstance(members, list) or any(
            not isinstance(m, str) or not m for m in members
        ):
            raise ValueError(
                "Beat.board composition_ref members must be a list of non-empty strings"
            )
        if not isinstance(composition_ref.get("layout"), dict):
            raise ValueError("Beat.board composition_ref layout must be a dict")
        view = composition_ref.get("view")
        if not isinstance(view, str) or not view:
            raise ValueError(
                "Beat.board composition_ref view must be a non-empty project-relative path"
            )
        Beat._validate_board_artifact_path(view, "Beat.board composition_ref view")

    @staticmethod
    def _validate_board_artifact_path(artifact: str, label: str) -> None:
        path = PurePath(artifact)
        if path.is_absolute():
            raise ValueError(f"{label} must be project-relative, not absolute")
        if ".." in path.parts:
            raise ValueError(f"{label} must stay within the project root")

    def add_take(self, take: Take) -> None:
        """Append an already-constructed Take. Validates take_id uniqueness."""
        if not isinstance(take, Take):
            raise TypeError(f"Beat.add_take requires a Take, got {type(take).__name__}")
        if any(t.take_id == take.take_id for t in self.takes):
            raise ValueError(
                f"Duplicate take_id {take.take_id!r} in beat {self.beat_id!r}"
            )
        self.takes.append(take)

    def new_take(
        self,
        *,
        workflow: Workflow,
        take_metadata: Optional[dict[str, Any]] = None,
    ) -> Take:
        """Construct + append a new Take with auto-assigned take_index/take_id.

        take_index is len(self.takes) at the moment of call. take_id is
        f"{beat_id}_take_{take_index}". Returns the new Take. The Take
        starts with status="pending"; caller invokes Take.execute to run.
        """
        idx = len(self.takes)
        take = Take(
            take_id=f"{self.beat_id}_take_{idx}",
            take_index=idx,
            workflow=workflow,
            take_metadata=dict(take_metadata or {}),
        )
        # LOCKED-2 from spec-review: route through self.add_take(), NOT
        # self.takes.append(). add_take performs duplicate take_id detection
        # (raises ValueError on collision). The auto-id pattern
        # `f"{beat_id}_take_{idx}"` collides if a caller mutates self.takes
        # externally (e.g., manually removing a take then calling new_take
        # — the new len() yields an idx that already exists). add_take's
        # duplicate check catches this at the point of insertion rather than
        # at next-validation time.
        self.add_take(take)
        return take

    @property
    def primary_take(self) -> Optional[Take]:
        """Return the primary Take object, or None if primary_take_id is unset
        or doesn't match a take (defensive — should be caught at __post_init__)."""
        if self.primary_take_id is None:
            return None
        for t in self.takes:
            if t.take_id == self.primary_take_id:
                return t
        return None

    def to_dict(self) -> dict[str, Any]:
        return {
            "beat_id": self.beat_id,
            "takes": [t.to_dict() for t in self.takes],
            "primary_take_id": self.primary_take_id,
            "beat_metadata": dict(self.beat_metadata),
            "created_at": self.created_at,
            "max_takes": self.max_takes,
            "approved": self.approved,
            "phantom_recovery_count": self.phantom_recovery_count,
            "board": dict(self.board) if self.board is not None else None,
        }

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "Beat":
        return cls(
            beat_id=d["beat_id"],
            takes=[Take.from_dict(t) for t in (d.get("takes") or [])],
            primary_take_id=d.get("primary_take_id"),
            beat_metadata=dict(d.get("beat_metadata") or {}),
            created_at=d.get("created_at") or utc_now_iso8601(),
            max_takes=int(d.get("max_takes", 3)),
            approved=bool(d.get("approved", False)),
            phantom_recovery_count=int(d.get("phantom_recovery_count", 0)),
            board=dict(d["board"]) if d.get("board") is not None else None,
        )


@dataclass
class Scene:
    """A thin grouping of Beats. CP-7 ships dataclass + serialization only.

    Fields: scene_id (recommended `ep{NNN}_sc{NN}`), beats (ordered),
    scene_metadata (free-form), created_at. No execution semantics.
    """

    scene_id: str
    beats: list[Beat] = field(default_factory=list)
    scene_metadata: dict[str, Any] = field(default_factory=dict)
    created_at: str = field(default_factory=utc_now_iso8601)
    locked: bool = False
    lock_reason: str | None = None
    locked_at: str | None = None
    locked_by: str | None = None

    def __post_init__(self) -> None:
        if not isinstance(self.scene_id, str) or not self.scene_id:
            raise ValueError(
                f"Scene.scene_id must be a non-empty string, got {self.scene_id!r}"
            )
        if not isinstance(self.beats, list):
            raise TypeError(
                f"Scene.beats must be a list, got {type(self.beats).__name__}"
            )
        for i, b in enumerate(self.beats):
            if not isinstance(b, Beat):
                raise TypeError(
                    f"Scene.beats[{i}] must be a Beat, got {type(b).__name__}"
                )
        seen = set()
        for b in self.beats:
            if b.beat_id in seen:
                raise ValueError(
                    f"Duplicate beat_id {b.beat_id!r} in scene {self.scene_id!r}"
                )
            seen.add(b.beat_id)

    def add_beat(self, beat: Beat) -> None:
        """Append an already-constructed Beat. Validates beat_id uniqueness."""
        if not isinstance(beat, Beat):
            raise TypeError(f"Scene.add_beat requires a Beat, got {type(beat).__name__}")
        if any(b.beat_id == beat.beat_id for b in self.beats):
            raise ValueError(
                f"Duplicate beat_id {beat.beat_id!r} in scene {self.scene_id!r}"
            )
        self.beats.append(beat)

    @property
    def status(self) -> SceneStatus:
        """Derived Scene status from member Beat states.

        - "final"        ↔ all Beats approved
        - "review"       ↔ any Beat in review (primary chosen, not yet approved)
        - "generating"   ↔ any Beat generating
        - "not_started"  ↔ all Beats untouched
        - "in_progress"  otherwise (mix of approved and unstarted/pending)
        """
        if not self.beats:
            return "not_started"
        if all(b.approved for b in self.beats):
            return "final"
        if any(b.status == "generating" for b in self.beats):
            return "generating"
        if any(b.status == "review" for b in self.beats):
            return "review"
        if all(b.status == "not_started" for b in self.beats):
            return "not_started"
        return "in_progress"

    def to_dict(self) -> dict[str, Any]:
        return {
            "scene_id": self.scene_id,
            "beats": [b.to_dict() for b in self.beats],
            "scene_metadata": dict(self.scene_metadata),
            "created_at": self.created_at,
            "locked": self.locked,
            "lock_reason": self.lock_reason,
            "locked_at": self.locked_at,
            "locked_by": self.locked_by,
        }

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "Scene":
        return cls(
            scene_id=d["scene_id"],
            beats=[Beat.from_dict(b) for b in (d.get("beats") or [])],
            scene_metadata=dict(d.get("scene_metadata") or {}),
            created_at=d.get("created_at") or utc_now_iso8601(),
            locked=bool(d.get("locked", False)),
            lock_reason=d.get("lock_reason"),
            locked_at=d.get("locked_at"),
            locked_by=d.get("locked_by"),
        )


# ──────────────────────────────────────────────────────────────────────
# Phase 3: Take.execute — wraps Workflow.run + compresses step status.
# ──────────────────────────────────────────────────────────────────────


def _compress_step_status(workflow: Workflow) -> TakeStatus:
    """Compress a workflow's step statuses into a take-level status.

    Algorithm:
      - all `succeeded` → `succeeded`
      - any `succeeded` AND any (`failed` OR `skipped`) → `partial`
      - none `succeeded` (all `failed` / `skipped` / `pending` / `running`)
        → `failed`
    """
    if not workflow.steps:
        # Defensive — Workflow.run validates non-empty before reaching here.
        return "failed"

    statuses = [s.status for s in workflow.steps]
    n_succeeded = sum(1 for st in statuses if st == "succeeded")
    n_total = len(statuses)

    if n_succeeded == n_total:
        return "succeeded"
    if n_succeeded == 0:
        return "failed"
    return "partial"


def _take_execute(
    take: "Take",
    *,
    context,
    pre_step: Optional[HookFn] = None,
    post_step: Optional[HookFn] = None,
    on_failure: Optional[HookFn] = None,
) -> "Take":
    """Execute the take's underlying Workflow and compress status.

    Calling Take.execute() twice on the same Take re-runs the underlying
    Workflow (CP-6 documents Workflow.run is re-run-safe via execution-
    state reset on each step). The editorial idiom for a re-attempt on
    the same beat is `Beat.new_take(workflow=fresh_workflow)`, NOT
    `take.execute()` twice — but both patterns are supported.

    Hook contract is identical to CP-6: hooks receive (step, workflow)
    and propagate exceptions strictly. CP-9 may add take-level hooks
    (`pre_take` / `post_take`) when eval primitives ship.

    Returns self (mutates take.status, take.workflow.steps in place).

    Exception safety (LOCKED-1 from spec-review): if `Workflow.run` raises
    (e.g. CP-6 hook-exception path at workflow.py:390-394), the `finally`
    block ensures Take.status lands in a terminal value via
    `_compress_step_status`. Without this, the Take would be stuck in
    "running" — same bug class as CP-6 post-build LOCKED-1. The compression
    function correctly handles partially-executed workflows because CP-6's
    upfront-reset leaves un-reached steps in "pending" → if any prior step
    succeeded, Take is "partial"; otherwise "failed".
    """
    take.status = "running"
    try:
        take.workflow.run(
            context=context,
            pre_step=pre_step,
            post_step=post_step,
            on_failure=on_failure,
        )
    finally:
        take.status = _compress_step_status(take.workflow)
    return take


# Bind execute as a method on Take. Set __name__ / __qualname__ so that
# help(Take.execute), repr(Take.execute), and IDE autocompletion show the
# canonical method name rather than the underscore-prefixed implementation.
_take_execute.__name__ = "execute"
_take_execute.__qualname__ = "Take.execute"
Take.execute = _take_execute  # type: ignore[assignment]


# ──────────────────────────────────────────────────────────────────────
# Phase 4: Beat.select_primary — primary-selection strategies.
# ──────────────────────────────────────────────────────────────────────


PrimaryStrategy = Literal["first_success", "manual", "score"]


def _beat_select_primary(
    beat: "Beat",
    *,
    strategy: PrimaryStrategy = "first_success",
) -> Optional[str]:
    """Select the primary Take for this Beat.

    Strategies:
      - "first_success" (CP-7 DEFAULT): iterate self.takes by take_index ASC,
        return the take_id of the first take with status="succeeded". Sets
        self.primary_take_id to the chosen id. If none succeeded, returns
        None and leaves self.primary_take_id unchanged.
      - "manual": no-op. Returns self.primary_take_id unchanged. Caller is
        expected to set self.primary_take_id directly before/after calling.
      - "score" (CP-9): compute aggregate_score on every take whose
        aggregate_score is None, then pick the take with the highest
        aggregate_score (ties broken by take_index ASC). Score-less takes
        sort below scored takes (effectively ineligible). If NO take
        produced a score, returns None and leaves self.primary_take_id
        unchanged — parallels first_success semantics. Does NOT raise.

    The strategy enum is forward-compat — adding strategies post-CP-9 is
    additive and safe; removing or renaming is a contract break.
    """
    if strategy == "score":
        # CP-9 score-based selection.
        #
        # Algorithm:
        #   1. For every Take in beat.takes, ensure aggregate_score is computed
        #      (calls compute_aggregate_score on takes whose aggregate_score is None).
        #   2. Partition takes into scored (aggregate_score is not None) and
        #      score-less (aggregate_score is None).
        #   3. Among scored takes: pick the highest aggregate_score; on ties,
        #      lowest take_index wins (ASC).
        #   4. If no scored takes exist, return None and leave
        #      beat.primary_take_id unchanged (parallels "first_success"
        #      semantics when nothing succeeded — CP-7 LOCKED behavior).
        #
        # NOTE: this branch DOES mutate Takes — compute_aggregate_score
        # side-effects each Take whose aggregate_score is None. That's
        # deliberate; CP-9 hand-off documents it. Pre-set aggregate_score
        # values are honored without recomputation.
        for t in beat.takes:
            if t.aggregate_score is None:
                t.compute_aggregate_score()
        scored = [t for t in beat.takes if t.aggregate_score is not None]
        if not scored:
            return None
        # Sort: scored-vs-unscored partitioning is implicit (we filtered to
        # scored only). Within scored: highest aggregate_score first, ties
        # broken by take_index ASC. Equivalent (per § 12d item 5) to a sort
        # key of (score is None, -score_or_zero, take_index) when applied
        # to the full beat.takes list.
        scored.sort(key=lambda t: (-t.aggregate_score, t.take_index))
        chosen = scored[0]
        beat.primary_take_id = chosen.take_id
        return chosen.take_id
    if strategy == "manual":
        return beat.primary_take_id
    if strategy == "first_success":
        # Iterate by take_index ASC. self.takes preserves insertion order,
        # which equals take_index for new_take-built Takes; an externally-
        # constructed Beat may have non-monotonic indices, so sort.
        ordered = sorted(beat.takes, key=lambda t: t.take_index)
        for t in ordered:
            if t.status == "succeeded":
                beat.primary_take_id = t.take_id
                return t.take_id
        return None
    raise ValueError(
        f"Unknown primary-selection strategy: {strategy!r}. "
        f"Supported: 'first_success', 'manual', 'score' (CP-9)."
    )


# Bind select_primary as a method on Beat. Set __name__ / __qualname__
# (LOCKED-4 from spec-review) so help(Beat.select_primary) and IDE
# autocompletion show the canonical method name.
_beat_select_primary.__name__ = "select_primary"
_beat_select_primary.__qualname__ = "Beat.select_primary"
Beat.select_primary = _beat_select_primary  # type: ignore[assignment]
