"""Eval primitive — EvalNode Protocol, PanelOfJudges, EvalRegistry (CP-9 Phase 3).

CP-9 deliverable. Replaces hardcoded single-critic patterns (legacy
``recoil/core/critic.py`` `CriticLoop` ABC) with a pluggable architecture:
any object implementing the :class:`EvalNode` Protocol can be a judge,
multiple judges form a :class:`PanelOfJudges`, and a panel scores a
:class:`GenerationReceipt` producing a ScoreCard that lands in
``receipt.eval_scores[panel_id]`` (in-place dict mutation — receipt is
``frozen=True`` but ``eval_scores`` is a mutable dict ref per CP-5).

Design contract (locked at CP-9 per loraverse SYNTHESIS Locked Decisions
#9 + #10 + Phase 1 audit § 12b corrections):

  - :class:`EvalNode` is a ``runtime_checkable`` Protocol — duck-typed; any
    class with ``judge_id``, ``model_used``, and ``evaluate(context)`` is a
    node.
  - :class:`EvalContext` field shape per Phase 3 hand-off brief:
    ``target_artifact_path: Path``, ``target_take`` (the Take being
    scored — Any / late-bound to avoid circular import), ``prompt: str``,
    ``rubric: str``, ``judge_id: str``, ``metadata: dict[str, Any]``,
    ``scene_takes: list`` (RESERVED — cross-take continuity critic, deferred
    to a later CP per SYNTHESIS Locked Decision #10).
  - :class:`PanelOfJudges` aggregates via ``"median"`` (default) or
    ``"mean"``. Outliers (any judge ≥ ``OUTLIER_THRESHOLD`` from aggregate)
    flagged in ``panel_warnings``. Cost cap aborts panel mid-run; partial
    scorecard returned with ``panel_warnings`` containing
    ``"cost_cap_aborted_at_judge_N"``.
  - Tournament / TournamentEliminator / CostGate explicitly DEFERRED.
  - :func:`attach_eval_hooks` returns a ``(pre_step, post_step, on_failure)``
    triple suitable for ``Workflow.run`` kwargs.
  - :class:`EvalRegistry` is keyed by ``judge_id: str``. Distinct from the
    CP-4 modality registry — no key/identifier overlap.

Public surface (locked at CP-9 Phase 3):
    EvalNode (Protocol)
    EvalResult (dataclass)
    EvalContext (dataclass with reserved scene_takes)
    PanelOfJudges (class)
    EvalRegistry / register_eval_node / get_eval_node / list_eval_nodes /
        is_eval_registered
    attach_eval_hooks (utility)

Phase 4 ships eval modality runners that wrap registered EvalNodes.
Phase 7 ships ``LegacyFlashCriticEvalNode`` adapter for the existing
``CriticLoop`` ABC. Adding a new judge type post-CP-9 is additive: register
a new EvalNode class (e.g. ClaudeSonnetEvalNode) via
``register_eval_node(judge_id, node)``.
"""

from __future__ import annotations

import logging
import statistics
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Optional, Protocol, runtime_checkable

from recoil.pipeline.core.cost import read_cost_from_record_safe

logger = logging.getLogger(__name__)


# ── Constants ─────────────────────────────────────────────────────────

#: Aggregation strategies supported by :class:`PanelOfJudges`. Validated in
#: ``PanelOfJudges.__post_init__``.
SUPPORTED_AGGREGATIONS: tuple[str, ...] = ("median", "mean")

#: Default outlier flagging threshold. Any judge whose score is ≥ this
#: distance from the panel aggregate is flagged in ``panel_warnings``.
#: Per Phase 3 brief: "any judge >= 0.3 from median".
OUTLIER_THRESHOLD: float = 0.3


# ── EvalContext ──────────────────────────────────────────────────────

@dataclass
class EvalContext:
    """Per-evaluation context passed from runner/panel to :class:`EvalNode`.

    Field shape locked by Phase 3 hand-off brief (overrides the BUILD_SPEC
    body's earlier draft shape).

    Fields:
        target_artifact_path: Local filesystem path to the artifact to
            score. Coerced to :class:`pathlib.Path` if a string is passed.
            EvalNode opens this file.
        target_take: The Take being scored. Typed ``Any`` for late binding
            (avoids a ``pipeline.core.take`` circular import).
        prompt: The original generation prompt that produced the artifact.
            Free-form. Judges may use it for prompt-adherence scoring.
        rubric: The scoring rubric — defines what "good" means for this
            evaluation. Non-empty.
        judge_id: Stable id for the judge invocation. Threaded into
            ``EvalResult.judge_id``.
        metadata: Free-form dict for judge-specific knobs (e.g.
            ``{"_transport": <mock>}`` for tests, ``"reason"`` for failure
            re-eval, etc.).
        scene_takes: RESERVED — list of sibling Takes from the same Scene,
            for cross-take continuity scoring. Designed in CP-9 per
            SYNTHESIS Locked Decision #10; implementation deferred to a
            later CP. CP-9 judges must NOT consume this field.

    EvalContext is intentionally not frozen — callers may mutate
    ``metadata`` between judge invocations within a panel run.
    """

    target_artifact_path: Path
    target_take: Any  # Take — late binding to avoid circular import
    prompt: str
    rubric: str
    judge_id: str
    metadata: dict[str, Any] = field(default_factory=dict)
    # reserved for cross-take continuity critic, deferred CP-N
    scene_takes: list = field(default_factory=list)

    def __post_init__(self) -> None:
        # Coerce string paths transparently for caller convenience.
        if not isinstance(self.target_artifact_path, Path):
            self.target_artifact_path = Path(self.target_artifact_path)
        if not isinstance(self.prompt, str):
            raise TypeError(
                f"EvalContext.prompt must be a string, "
                f"got {type(self.prompt).__name__}"
            )
        if not isinstance(self.rubric, str) or not self.rubric:
            raise ValueError("EvalContext.rubric must be a non-empty string")
        if not isinstance(self.judge_id, str) or not self.judge_id:
            raise ValueError("EvalContext.judge_id must be a non-empty string")
        if not isinstance(self.metadata, dict):
            raise TypeError(
                f"EvalContext.metadata must be a dict, "
                f"got {type(self.metadata).__name__}"
            )
        if not isinstance(self.scene_takes, list):
            raise TypeError(
                f"EvalContext.scene_takes must be a list, "
                f"got {type(self.scene_takes).__name__}"
            )


# ── EvalResult ───────────────────────────────────────────────────────

@dataclass
class EvalResult:
    """One judge's verdict on one artifact.

    Locked schema (CP-9 Phase 3). Adding fields post-CP-9 is safe;
    renaming or removing fields is a contract break.

    Fields:
        score: float in ``[0.0, 1.0]``. Already clipped by the upstream
            adapter (Phase 2 ``score_artifact`` enforces clipping +
            ``WARNING_SCORE_CLIPPED`` token).
        reasoning: free-form LLM critique (3-5 sentences typical).
        judge_id: stable id for the judge that produced this result.
        model_used: which underlying model was called.
        cost_usd: this judge's contribution to panel cost. Defaults to 0.0
            for non-LLM judges (e.g. legacy critic adapter at Phase 7).
        metadata: opaque dict (request_id, raw response excerpts, warnings,
            etc.).
    """

    score: float
    reasoning: str
    judge_id: str
    model_used: str
    cost_usd: float = 0.0
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        if isinstance(self.score, bool) or not isinstance(self.score, (int, float)):
            # ``bool`` is a subclass of ``int`` in Python — reject it
            # explicitly; an EvalNode that returns True/False is buggy.
            raise TypeError(
                f"EvalResult.score must be a number (int/float), "
                f"got {type(self.score).__name__}"
            )
        if self.score < 0.0 or self.score > 1.0:
            raise ValueError(
                f"EvalResult.score must be in [0.0, 1.0], got {self.score}"
            )
        if not isinstance(self.judge_id, str) or not self.judge_id:
            raise ValueError("EvalResult.judge_id must be a non-empty string")
        if not isinstance(self.model_used, str):
            raise TypeError(
                f"EvalResult.model_used must be a string, "
                f"got {type(self.model_used).__name__}"
            )
        if not isinstance(self.metadata, dict):
            raise TypeError(
                f"EvalResult.metadata must be a dict, "
                f"got {type(self.metadata).__name__}"
            )

    def to_dict(self) -> dict[str, Any]:
        """Serialize for inclusion in a panel scorecard."""
        return {
            "score": float(self.score),
            "reasoning": self.reasoning,
            "judge_id": self.judge_id,
            "model_used": self.model_used,
            "cost_usd": float(self.cost_usd),
            "metadata": dict(self.metadata),
        }

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "EvalResult":
        """Inverse of :meth:`to_dict`. Defaults absent ``cost_usd`` /
        ``metadata`` to 0.0 / ``{}`` for forward compatibility with legacy
        scorecards (CP-9 always emits the full shape)."""
        return cls(
            score=float(d["score"]),
            reasoning=d["reasoning"],
            judge_id=d["judge_id"],
            model_used=d["model_used"],
            cost_usd=read_cost_from_record_safe(d),
            metadata=dict(d.get("metadata") or {}),
        )


# ── EvalNode Protocol ────────────────────────────────────────────────

@runtime_checkable
class EvalNode(Protocol):
    """Pluggable judge protocol.

    Any class with ``judge_id`` (str), ``model_used`` (str), and
    ``evaluate(context: EvalContext) -> EvalResult`` is an :class:`EvalNode`.
    The Protocol is ``runtime_checkable`` so ``isinstance(x, EvalNode)``
    works for duck-typed adapters (e.g. CP-9 Phase 7's
    ``LegacyFlashCriticEvalNode`` wrapping a ``CriticLoop`` instance).

    Note: ``runtime_checkable`` Protocol only checks for attribute
    *presence*, not signature shape. Callers wanting stronger guarantees
    should wrap registration in their own validation.
    """

    judge_id: str
    model_used: str

    def evaluate(self, context: EvalContext) -> EvalResult:
        ...  # pragma: no cover — Protocol stub


# ── EvalRegistry ─────────────────────────────────────────────────────


class _ClassOrInstanceMethod:
    """Descriptor: dispatch to the default singleton when called on the class,
    or to the instance when called on an instance.

    Phase D MF-11: lets :class:`EvalRegistry` be both an instantiable
    instance class (the canonical surface — ``EvalRegistry().register(...)``
    uses per-instance state) AND a backwards-compat class-level facade
    (``EvalRegistry.register(...)`` routes to ``_default_eval_registry``,
    matching the pre-MF-11 ``@staticmethod`` shim behavior).

    This dual-mode dispatch is the price of preserving the audit T5.11
    cleanup AND the existing test contract (see
    test_eval_registry_class_facade_*). Without the descriptor, the
    spec template would silently break tests that call
    ``EvalRegistry.register(...)`` as if it were static.
    """

    def __init__(self, func):
        self.func = func
        # functools.wraps-equivalent for introspection (e.g. hasattr checks).
        self.__doc__ = getattr(func, "__doc__", None)
        self.__name__ = getattr(func, "__name__", "method")

    def __get__(self, instance, owner):
        # Class-level access binds to the default singleton; instance access
        # binds to the instance. Use the function's own __get__ so the
        # returned bound method has correct __self__/__wrapped__/__qualname__/
        # signature semantics (without `self` showing up in inspect.signature).
        target = instance if instance is not None else _default_eval_registry
        return self.func.__get__(target)


class EvalRegistry:
    """Eval node registry — instance-attached state.

    Phase D MF-11: was a class facade over a shared module-level dict
    (audit T5.11 — two parallel public surfaces). Now the class IS the
    canonical surface; the module-level free functions
    (:func:`register_eval_node` / :func:`get_eval_node` /
    :func:`list_eval_nodes` / :func:`is_eval_registered`) are thin
    wrappers around :data:`_default_eval_registry`. Hot-reload tooling
    (Console v2 dev loop) can construct a fresh instance and swap.

    Keyed by ``judge_id: str``. Distinct namespace from the CP-4
    modality registry (``pipeline.core.registry.RunnerRegistry``) even
    though both are dicts of stringly-typed Protocol implementers.

    Class-level calls (``EvalRegistry.register(...)``) route to the
    process default singleton via a descriptor — preserving the
    pre-MF-11 ``@staticmethod`` facade contract. Instance-level calls
    (``EvalRegistry().register(...)``) use per-instance state.

    Thread-safety: NOT thread-safe. Callers serialize via process-level
    bootstrap.
    """

    def __init__(self) -> None:
        self._registry: dict[str, EvalNode] = {}

    @_ClassOrInstanceMethod
    def register(
        self,
        judge_id: str,
        node: EvalNode,
        *,
        force: bool = False,
    ) -> None:
        """Register an :class:`EvalNode` under a stable ``judge_id``.

        Re-registering the SAME instance under the SAME id is allowed and
        idempotent (no error, no-op). Re-registering a different instance
        under the same id raises :class:`ValueError` unless ``force=True``.

        Args:
            judge_id: Non-empty string identifier.
            node: Object satisfying the :class:`EvalNode` Protocol (must have
                ``judge_id``, ``model_used``, ``evaluate`` attrs).
            force: If True, overwrite any existing registration. Default
                False.

        Raises:
            ValueError: If ``judge_id`` is empty, or if a different node is
                already registered under ``judge_id`` and ``force=False``.
            TypeError: If ``node`` does not satisfy the EvalNode Protocol.
        """
        if not isinstance(judge_id, str) or not judge_id:
            raise ValueError("judge_id must be a non-empty string")
        if not isinstance(node, EvalNode):
            # ``runtime_checkable`` Protocol — accepts anything with the right
            # attributes. Failure here means the caller passed something
            # missing one of judge_id / model_used / evaluate.
            raise TypeError(
                f"node {node!r} does not satisfy EvalNode protocol "
                f"(needs judge_id, model_used, evaluate)"
            )
        existing = self._registry.get(judge_id)
        if existing is not None and existing is not node and not force:
            raise ValueError(
                f"judge_id {judge_id!r} already registered to {existing!r}; "
                f"pass force=True to override"
            )
        self._registry[judge_id] = node

    @_ClassOrInstanceMethod
    def get(self, judge_id: str) -> EvalNode:
        """Resolve a registered :class:`EvalNode`. Raises :class:`KeyError`
        if missing — the error message lists currently registered ids to aid
        debugging."""
        if judge_id not in self._registry:
            raise KeyError(
                f"no EvalNode registered under {judge_id!r}. "
                f"Registered: {sorted(self._registry.keys())}"
            )
        return self._registry[judge_id]

    @_ClassOrInstanceMethod
    def list(self) -> list[str]:
        """Return all registered judge ids, sorted ascending."""
        return sorted(self._registry.keys())

    @_ClassOrInstanceMethod
    def is_registered(self, judge_id: str) -> bool:
        """True iff a node is registered under ``judge_id``. No exception."""
        return judge_id in self._registry

    @_ClassOrInstanceMethod
    def reset(self) -> None:
        """Test-only: clear the eval registry. Do NOT call from production code."""
        self._registry.clear()


# Process-singleton — the canonical eval registry instance for this process.
# Hot-reload tooling can construct a fresh EvalRegistry() and reassign.
_default_eval_registry = EvalRegistry()


# ── Module-level free functions — thin wrappers, preserved API ───────
def register_eval_node(
    judge_id: str,
    node: EvalNode,
    *,
    force: bool = False,
) -> None:
    """Process-singleton convenience wrapper around
    :meth:`EvalRegistry.register`. Tests/hot-reload should construct
    their own EvalRegistry() and call .register() directly."""
    _default_eval_registry.register(judge_id, node, force=force)


def get_eval_node(judge_id: str) -> EvalNode:
    return _default_eval_registry.get(judge_id)


def list_eval_nodes() -> list[str]:
    return _default_eval_registry.list()


def is_eval_registered(judge_id: str) -> bool:
    return _default_eval_registry.is_registered(judge_id)


def _reset_eval_registry_for_tests() -> None:
    """Test-only: clear the eval registry. Mirrors
    :func:`pipeline.core.registry._reset_for_tests`. Do NOT call from
    production code."""
    _default_eval_registry.reset()


# ── PanelOfJudges ────────────────────────────────────────────────────

@dataclass
class PanelOfJudges:
    """Multi-judge orchestrator. Default aggregation: ``"median"``.

    Fields:
        panel_id: Stable id for the panel; appears as the key in
            ``receipt.eval_scores[panel_id]``.
        judges: List of :class:`EvalNode` instances (length ≥ 1).
        aggregation: ``"median"`` (default) or ``"mean"``. Validated.
        cost_cap_usd: Hard cap on accumulated cost across judges in one
            ``score()`` call. ``None`` disables. When the projected
            cumulative cost AFTER the next judge would exceed the cap,
            the panel aborts BEFORE invoking the next judge — partial
            scorecard returned with ``panel_warnings`` carrying
            ``"cost_cap_aborted_at_judge_N"``.

    Re-running ``.score()`` on the same instance is safe (no internal
    state). Per Phase 3 brief: ``score(receipt, context) -> dict``
    returning a ScoreCard with keys:
    ``panel_id`` / ``panel_score`` / ``panel_warnings`` / ``judges`` /
    ``aggregation`` / ``panel_cost_usd``.
    """

    panel_id: str
    judges: list[EvalNode]
    aggregation: str = "median"
    cost_cap_usd: Optional[float] = None

    def __post_init__(self) -> None:
        if not isinstance(self.panel_id, str) or not self.panel_id:
            raise ValueError(
                "PanelOfJudges.panel_id must be a non-empty string"
            )
        if not isinstance(self.judges, list) or len(self.judges) < 1:
            raise ValueError(
                "PanelOfJudges.judges must be a non-empty list of EvalNode"
            )
        for i, j in enumerate(self.judges):
            if not isinstance(j, EvalNode):
                raise TypeError(
                    f"PanelOfJudges.judges[{i}] does not satisfy EvalNode "
                    f"protocol (needs judge_id, model_used, evaluate)"
                )
        if self.aggregation not in SUPPORTED_AGGREGATIONS:
            raise ValueError(
                f"PanelOfJudges.aggregation must be one of "
                f"{SUPPORTED_AGGREGATIONS}, got {self.aggregation!r}"
            )
        if self.cost_cap_usd is not None:
            if not isinstance(self.cost_cap_usd, (int, float)) or \
                    isinstance(self.cost_cap_usd, bool):
                raise TypeError(
                    f"PanelOfJudges.cost_cap_usd must be a number or None, "
                    f"got {type(self.cost_cap_usd).__name__}"
                )
            if self.cost_cap_usd < 0:
                raise ValueError(
                    "PanelOfJudges.cost_cap_usd must be >= 0 or None"
                )

    def score(
        self,
        receipt: Any,  # GenerationReceipt — late-binding (unused in body)
        context: EvalContext,
    ) -> dict[str, Any]:
        """Run all judges, aggregate, return ScoreCard dict.

        Per Phase 3 brief, ``receipt`` is accepted in the signature but
        the panel itself does not read it — judges read ``context``. The
        receipt is in the signature so callers (especially the
        :func:`attach_eval_hooks` post-step) have a uniform call shape and
        future extensions (e.g. per-judge receipt-conditioning) can land
        without a signature break.

        Returns shape:

            {
                "panel_id": str,
                "panel_score": float | None,    # None if no judges produced a result
                "panel_warnings": list[str],
                "judges": list[dict],           # each per EvalResult.to_dict()
                "aggregation": "median" | "mean",
                "panel_cost_usd": float,
            }

        Failure modes within ``score()``:
          - Per-judge exception → judge contributes NOTHING to the
            scorecard (skipped); ``panel_warnings`` appended with judge id
            + error class. Other judges continue.
          - Judge returns non-:class:`EvalResult` → skipped; warning
            appended.
          - cost_cap exceeded BEFORE invoking next judge → remaining
            judges NOT invoked; ``panel_warnings`` appended
            ``"cost_cap_aborted_at_judge_N"``.
          - All judges errored / cap aborted before first run →
            ``panel_score=None``; ``panel_warnings`` carries every error.
        """
        results: list[dict[str, Any]] = []
        warnings: list[str] = []
        accumulated_cost: float = 0.0

        for idx, judge in enumerate(self.judges):
            # Cost-cap probe BEFORE invoking the next judge. Per Phase 3
            # brief: hard-abort when accumulated cost has already reached
            # or exceeded the cap. This means cost_cap=0 aborts the panel
            # immediately (first probe sees accumulated_cost==0 >= 0).
            if (
                self.cost_cap_usd is not None
                and accumulated_cost >= self.cost_cap_usd
            ):
                warnings.append(f"cost_cap_aborted_at_judge_{idx}")
                break
            try:
                result = judge.evaluate(context)
            except Exception as exc:  # noqa: BLE001
                judge_label = getattr(judge, "judge_id", f"judges[{idx}]")
                warnings.append(
                    f"judge_{idx}_{judge_label}_errored:"
                    f"{type(exc).__name__}:{exc}"
                )
                logger.warning(
                    "PanelOfJudges %s: judge %d (%s) errored: %s",
                    self.panel_id, idx, judge_label, exc,
                )
                continue
            if not isinstance(result, EvalResult):
                judge_label = getattr(judge, "judge_id", f"judges[{idx}]")
                warnings.append(
                    f"judge_{idx}_{judge_label}_returned_non_EvalResult:"
                    f"{type(result).__name__}"
                )
                continue
            results.append(result.to_dict())
            accumulated_cost += float(result.cost_usd)

        if not results:
            return {
                "panel_id": self.panel_id,
                "panel_score": None,
                "panel_warnings": warnings,
                "judges": [],
                "aggregation": self.aggregation,
                "panel_cost_usd": float(accumulated_cost),
            }

        scores = [r["score"] for r in results]
        if self.aggregation == "median":
            agg = float(statistics.median(scores))
        elif self.aggregation == "mean":
            agg = float(statistics.fmean(scores))
        else:
            # Defensive — __post_init__ validates, but guard anyway.
            raise ValueError(
                f"unsupported aggregation {self.aggregation!r}"
            )

        # Outlier flagging — locked format per Phase 3 brief:
        # ``f"outlier:judge_{i}:{score}"`` is one of the suggested forms;
        # we use a slightly richer variant with judge_id (more useful in
        # logs) plus the i index for traceability. Format LOCKED here.
        for i, r in enumerate(results):
            if abs(r["score"] - agg) >= OUTLIER_THRESHOLD:
                warnings.append(
                    f"outlier:judge_{i}:{r['judge_id']}:"
                    f"score_{r['score']:.3f}_vs_aggregate_{agg:.3f}"
                )

        return {
            "panel_id": self.panel_id,
            "panel_score": agg,
            "panel_warnings": warnings,
            "judges": results,
            "aggregation": self.aggregation,
            "panel_cost_usd": float(accumulated_cost),
        }


# ── attach_eval_hooks utility ────────────────────────────────────────

def attach_eval_hooks(
    workflow: Any,  # Workflow — late binding
    panel: PanelOfJudges,
    *,
    rubric_per_modality: Optional[dict[str, str]] = None,
    eval_on_failure: bool = True,
) -> tuple[Callable, Callable, Callable]:
    """Build a ``(pre_step, post_step, on_failure)`` triple wired to
    ``panel`` and suitable for ``Workflow.run`` kwargs.

    The returned hooks have the CP-6 signature ``(step, workflow) -> None``.
    They:
      - **pre_step:** no-op (reserved for pre-gen prompt eval if a future
        CP enables it).
      - **post_step:** if ``step`` succeeded, has a receipt with an
        output_path, and the modality is eval-able (image_t2i / video_i2v
        / audio_t2a / lipsync_post), build an :class:`EvalContext` and call
        ``panel.score(receipt, context)``. Write the scorecard into
        ``step.receipt.eval_scores[panel.panel_id]`` via in-place dict
        mutation (per audit § 12e — receipt is ``frozen=True`` but
        ``eval_scores`` is a mutable dict ref).
      - **on_failure:** when ``eval_on_failure=True``, runs the same eval
        for diagnostic logging with ``metadata["reason"]="step failed"``.
        Skips when there is no output_path (nothing to score).

    The hooks NEVER raise — failures are logged and silently swallowed.
    Per audit § 12e: a hook that raises would crash the executor, hiding
    eval bugs behind generation failures.

    Args:
        workflow: The Workflow the hooks will be attached to. Currently
            unused inside the hooks (reserved for future per-workflow
            knob); accepted in signature so callers can write
            ``Workflow.run(*attach_eval_hooks(wf, panel))``.
        panel: The :class:`PanelOfJudges` to invoke per step.
        rubric_per_modality: Override the default rubric per eval modality.
            Keys: ``"image"``, ``"video"``, ``"audio"``. Defaults to
            embedded baseline rubrics — caller customizes per deployment.
        eval_on_failure: When True (default), failed steps with output
            artifacts also get scored. When False, ``on_failure`` is a
            no-op (still returned in the triple for signature symmetry).

    Returns:
        ``(pre_step, post_step, on_failure)`` — three callables matching
        CP-6 ``HookFn`` signature.
    """
    rubric_map = dict(rubric_per_modality or {
        "image": (
            "Score this image's compositional quality, lighting, and "
            "continuity against the prompt. Respond with JSON "
            '{"score": float in [0,1], "reasoning": "..."}.'
        ),
        "video": (
            "Score this video's motion plausibility, temporal "
            "consistency, and adherence to the source intent. Respond "
            'with JSON {"score": float in [0,1], "reasoning": "..."}.'
        ),
        "audio": (
            "Score this audio's intelligibility, voice match, and "
            "emotional fit. Respond with JSON "
            '{"score": float in [0,1], "reasoning": "..."}.'
        ),
    })

    # Map step modality → eval target modality. Imported from registry to
    # avoid hardcoded magic strings (silent rot if registry constants
    # rename). Eval modalities themselves are intentionally absent — we
    # never recursively score eval outputs (per audit § 12c).
    from recoil.pipeline.core.registry import (
        MODALITY_AUDIO_T2A,
        MODALITY_IMAGE_T2I,
        MODALITY_LIPSYNC_POST,
        MODALITY_VIDEO_I2V,
    )
    MODALITY_TO_EVAL: dict[str, str] = {
        MODALITY_IMAGE_T2I: "image",
        MODALITY_VIDEO_I2V: "video",
        MODALITY_AUDIO_T2A: "audio",
        MODALITY_LIPSYNC_POST: "video",  # output is a video
    }

    def _eval_step(step: Any, _workflow: Any, *, reason: Optional[str] = None) -> None:
        try:
            receipt = getattr(step, "receipt", None)
            if receipt is None:
                return
            modality = getattr(step, "modality", None)
            if not isinstance(modality, str):
                return
            target_modality = MODALITY_TO_EVAL.get(modality)
            if target_modality is None:
                return  # skip eval modalities themselves + unknown
            run_result = getattr(receipt, "run_result", None)
            output_path = getattr(run_result, "output_path", None) if run_result else None
            if not output_path:
                return  # nothing to score
            rubric = rubric_map.get(target_modality)
            if not rubric:
                return
            metadata: dict[str, Any] = {}
            if reason:
                metadata["reason"] = reason
            # Pull a generation prompt out of the payload if present.
            payload = getattr(step, "payload", {}) or {}
            prompt = (
                payload.get("prompt")
                or payload.get("text")
                or ""
            )
            if not isinstance(prompt, str):
                prompt = str(prompt)
            ctx = EvalContext(
                target_artifact_path=Path(output_path),
                target_take=None,  # Phase 4+ runners may pass through
                prompt=prompt,
                rubric=rubric,
                judge_id=panel.panel_id,
                metadata=metadata,
            )
            scorecard = panel.score(receipt, ctx)
            # Receipts are frozen — mutate the eval_scores dict in place.
            receipt.eval_scores[panel.panel_id] = scorecard
            # Cumulative eval cost stamped into provenance (separate from
            # generation cost). Mirror the in-place mutation rule.
            current = float(receipt.provenance.get("eval_cost_usd") or 0.0)
            receipt.provenance["eval_cost_usd"] = (
                current + float(scorecard.get("panel_cost_usd") or 0.0)
            )
        except Exception as exc:  # noqa: BLE001 — hooks must NOT raise
            logger.warning(
                "attach_eval_hooks: panel %s eval swallowed %s: %s",
                panel.panel_id, type(exc).__name__, exc,
            )

    def pre_step(step: Any, workflow_arg: Any) -> None:
        # Reserved for pre-gen prompt eval (deferred CP-N).
        return None

    def post_step(step: Any, workflow_arg: Any) -> None:
        if getattr(step, "status", None) == "succeeded":
            _eval_step(step, workflow_arg)

    def on_failure(step: Any, workflow_arg: Any) -> None:
        if eval_on_failure:
            _eval_step(step, workflow_arg, reason="step failed")

    return (pre_step, post_step, on_failure)


__all__ = [
    "EvalContext",
    "EvalResult",
    "EvalNode",
    "PanelOfJudges",
    "EvalRegistry",
    "register_eval_node",
    "get_eval_node",
    "list_eval_nodes",
    "is_eval_registered",
    "attach_eval_hooks",
    "SUPPORTED_AGGREGATIONS",
    "OUTLIER_THRESHOLD",
    "_reset_eval_registry_for_tests",
]
