"""dispatch() — single entry point for every generation call.

Wraps the CP-4 modality registry with:
- Lazy per-process bootstrap (register_default_runners on first call)
- DispatchContext-driven caller identification
- RunResult → GenerationReceipt wrapping
- JSONL audit log emission (best-effort; never blocks dispatch)
- StepRunner._dispatch_path stamping for sidecar provenance

Public surface:
    dispatch(modality, payload, *, context) -> GenerationReceipt
    register_default_runners(step_runner, *, force=False)  # canonical home

Old import paths still work via re-exports in pipeline/core/__init__.py
and pipeline/core/runners/__init__.py — both deprecated, removed one
cycle out (CP-6 / Phase 9).
"""

from __future__ import annotations

import json
import logging
import os
import sys
import threading
import time
from pathlib import Path
from typing import Any, Optional

from recoil.core.paths import RECOIL_ROOT
from recoil.pipeline.core.dispatch_context import DispatchContext
from recoil.pipeline.core.receipts import GenerationReceipt, make_receipt_id, utc_now_iso8601
from recoil.pipeline.core.registry import (
    RunResult,
    get_runner,
    is_registered,
    MODALITY_IMAGE_T2I,
    MODALITY_STORYBOARD,
    MODALITY_VIDEO_I2V,
)


logger = logging.getLogger(__name__)

_BOOTSTRAP_LOCK = threading.Lock()
# id(step_runner) → bootstrap memo. The registry holds references to the runner
# instances (which transitively reference the StepRunner), so id() reuse after
# GC cannot occur in practice — bootstrapped StepRunners outlive the process.
# Side effect: each distinct StepRunner ever passed to dispatch() is pinned.
# Long-lived servers that construct per-request StepRunners should pass a
# shared StepRunner via DispatchContext and put per-request state in payload
# / provenance_overrides.
_BOOTSTRAPPED_FOR: set[int] = set()


def _default_log_path() -> Path:
    """Default JSONL receipts log path: $RECOIL_ROOT/_dispatch_logs/receipts.jsonl."""
    return RECOIL_ROOT / "_dispatch_logs" / "receipts.jsonl"


def _emit_receipt_jsonl(receipt: GenerationReceipt, log_path: Optional[str]) -> None:
    """Append one JSONL line to receipts.jsonl. Cross-process safe.

    Best-effort — never raises (per CP-9 contract: receipts log failure must
    not block dispatch). The lock acquisition itself is mandatory — the
    audited failure mode is "lock missing", not "lock acquisition flakes".
    """
    try:
        if log_path in ("", "DISABLED"):
            return
        target = Path(log_path) if log_path else _default_log_path()
        line = json.dumps(receipt.to_dict()) + "\n"
        from recoil.pipeline._lib.jsonl_append import append_jsonl_locked
        append_jsonl_locked(target, line)
    except Exception as e:  # noqa: BLE001
        # Best-effort. Receipts log writes must not block dispatch.
        sys.stderr.write(f"[dispatch] receipt log write failed: {e}\n")


def _emit_eventbus_hook(receipt: GenerationReceipt) -> None:
    """Phase 19 — feature-flagged EventBus emit, no-op without RECOIL_EVENTBUS_ENABLED=1.

    NEVER raises. NEVER blocks. NEVER imports the recoil.api.* package
    unless the feature flag is set — keeping the engine→api import edge
    out of the default test environment (Track B preservation gate).

    Severity is derived from receipt status:
      • ``RunResult.success is True``   → "success"
      • ``RunResult.success is False``  → "failure"
      • else                            → "info"

    Scope is built from ``project / ep<NN> / shot_id`` if available,
    falling back to ``"engine"``.
    """
    if os.environ.get("RECOIL_EVENTBUS_ENABLED") != "1":
        return
    try:
        # Lazy import — engine package must not depend on recoil.api at module
        # load time (api → engine is the canonical direction). The hook only
        # imports the bus if the operator opted in.
        from recoil.api.eventbus import BUS

        run_result = receipt.run_result
        success = getattr(run_result, "success", None)
        if success is True:
            severity = "success"
        elif success is False:
            severity = "failure"
        else:
            severity = "info"

        scope_parts: list[str] = []
        if receipt.project:
            scope_parts.append(str(receipt.project))
        if receipt.episode is not None:
            try:
                scope_parts.append(f"ep{int(receipt.episode):02d}")
            except (TypeError, ValueError):
                scope_parts.append(str(receipt.episode))
        if receipt.shot_id:
            scope_parts.append(str(receipt.shot_id))
        scope = " / ".join(scope_parts) if scope_parts else "engine"

        summary = f"{receipt.modality}: {severity}"
        BUS.emit_sync(
            severity=severity,  # type: ignore[arg-type]
            scope=scope,
            summary=summary,
            payload={
                "receipt_id": receipt.receipt_id,
                "modality": receipt.modality,
                "caller_id": receipt.caller_id,
            },
        )
    except Exception as e:  # noqa: BLE001
        # Hook MUST be no-op-on-failure; never let api errors block dispatch.
        sys.stderr.write(f"[dispatch] eventbus hook failed: {e}\n")


def register_default_runners(step_runner, *, force: bool = False) -> None:
    """Register the LIVE runners bound to a StepRunner.

    Canonical home post-CP-5. The pipeline/core/runners/__init__.py
    re-export was removed in engine-fix Phase D Phase 8.

    Also re-registers the CP-4 stubs (audio_t2a, lipsync_post) idempotently
    so the dispatch surface is complete even after `_reset_for_tests()`
    has cleared the registry.
    """
    if step_runner is None:
        raise ValueError("register_default_runners requires a step_runner")
    # Avoid circular import at module load: import inside the function.
    from recoil.pipeline.core.runners.audio_runner import AudioRunner
    from recoil.pipeline.core.runners.image_runner import ImageRunner
    from recoil.pipeline.core.runners.lipsync_post import LipSyncPostProcessor
    from recoil.pipeline.core.runners.r2v_multi_runner import R2VMultiRunner
    from recoil.pipeline.core.registry import MODALITY_R2V_MULTI
    from recoil.pipeline.core.runners.storyboard_runner import StoryboardRunner
    from recoil.pipeline.core.runners.video_runner import VideoRunner
    from recoil.pipeline.core.registry import (
        register_runner,
        MODALITY_AUDIO_T2A,
        MODALITY_LIPSYNC_POST,
    )

    if force or not is_registered(MODALITY_IMAGE_T2I):
        register_runner(MODALITY_IMAGE_T2I, ImageRunner(step_runner), force=force)
    if force or not is_registered(MODALITY_VIDEO_I2V):
        register_runner(MODALITY_VIDEO_I2V, VideoRunner(step_runner), force=force)
    if force or not is_registered(MODALITY_R2V_MULTI):
        register_runner(MODALITY_R2V_MULTI, R2VMultiRunner(step_runner), force=force)
    # CP-9 calls this bootstrap a frozen surface, but r2v_multi was added
    # here post-CP-5; extending it is the established precedent for new
    # production modalities.
    if force or not is_registered(MODALITY_STORYBOARD):
        register_runner(MODALITY_STORYBOARD, StoryboardRunner(step_runner), force=force)
    if force or not is_registered(MODALITY_AUDIO_T2A):
        register_runner(MODALITY_AUDIO_T2A, AudioRunner(), force=force)
    if force or not is_registered(MODALITY_LIPSYNC_POST):
        register_runner(MODALITY_LIPSYNC_POST, LipSyncPostProcessor(), force=force)


def _ensure_bootstrap(context: DispatchContext) -> None:
    """Lazy bootstrap: register live runners once per StepRunner instance."""
    with _BOOTSTRAP_LOCK:
        sr_id = id(context.step_runner)
        if sr_id in _BOOTSTRAPPED_FOR:
            return
        try:
            context.step_runner._dispatch_path = context.caller_id
        except (AttributeError, TypeError) as e:
            sys.stderr.write(
                f"[pipeline.core.dispatch] could not stamp _dispatch_path on "
                f"{type(context.step_runner).__name__}: {e}\n"
            )
        register_default_runners(context.step_runner)
        _BOOTSTRAPPED_FOR.add(sr_id)


def _validate_model_constraints(
    model_id, payload: dict, profile: dict
) -> None:
    """Raises ModelConstraintError if payload violates supports_transparent_bg
    or multi_ref_supported flags declared in model_profiles.json.

    Reads both ``image_urls`` (fal adapter key) and ``identity_refs``
    (ImageRunner upstream key) so multi-ref is caught at the dispatcher
    boundary rather than failing with an opaque fal 400.
    """
    from recoil.pipeline.core.exceptions import ModelConstraintError

    if not profile.get("supports_transparent_bg", True) and payload.get("background") == "transparent":
        raise ModelConstraintError(
            f"[{model_id}] transparent background not supported"
        )
    refs = payload.get("image_urls") or payload.get("identity_refs") or []
    if not profile.get("multi_ref_supported", True) and isinstance(refs, list) and len(refs) > 1:
        raise ModelConstraintError(
            f"[{model_id}] multi-ref not supported "
            f"(max 1, got {len(refs)})"
        )


class PayloadValidationError(ValueError):
    """Raised by _validate_payload when a payload violates its modality contract.

    Subclass of ValueError so existing handlers continue to function.
    Caught only by tests that want to assert on the typed exception.
    """


def _validate_payload(modality: str, payload: dict) -> None:
    """Modality-keyed payload contract check at the dispatch boundary.

    Bug C-derived guard (2026-05-19): the historical i2v→t2v silent degrade
    happened because dispatch_payload wrote `image` (base64) while video_runner
    read `start_frame` (Path). The keys never lined up; no exception fired.
    This validator runs BEFORE runner lookup so contract violations surface
    with a typed error pointing at the actual payload keys.

    Contract:
        - modality == "video_i2v": if `start_frame` is set, it MUST exist.
          If NOT set (t2v via video_i2v modality), the call is permitted.
          If `image` key present but no `start_frame`, raise to catch
          callers still using the legacy shape.
        - modality == "r2v_multi" without `reference_images`: log WARNING
          (not a hard block — env-only multi-shot passes have no characters).
    """
    if modality == "video_i2v":
        sf = payload.get("start_frame")
        if sf is not None:
            try:
                if not Path(sf).exists():
                    raise PayloadValidationError(
                        f"video_i2v start_frame does not exist: {sf!r}"
                    )
            except (TypeError, ValueError) as e:
                raise PayloadValidationError(
                    f"video_i2v start_frame is not a valid path-like value "
                    f"({type(sf).__name__}: {sf!r}): {e}"
                )
        elif "image" in payload:
            raise PayloadValidationError(
                "video_i2v payload contains legacy 'image' key without "
                "'start_frame' — caller must migrate to start_frame: Path. "
                f"payload keys: {sorted(payload.keys())}"
            )
    elif modality == "r2v_multi":
        refs = payload.get("reference_images")
        if not refs:
            logger.warning(
                "r2v_multi dispatched with zero reference_images "
                "(shot_id=%s, model=%s) — identity will be prompt-only",
                payload.get("shot_id"),
                payload.get("model"),
            )
    elif modality == "storyboard":
        required = {
            "shot_id",
            "prompt",
            "model",
            "size_override",
            "save_dir",
            "filename_stem",
        }
        missing = sorted(required - set(payload.keys()))
        if missing:
            raise PayloadValidationError(
                "storyboard payload missing required keys: "
                f"{missing}; payload keys: {sorted(payload.keys())}"
            )


def dispatch(
    modality: str,
    payload: dict,
    *,
    context: DispatchContext,
) -> GenerationReceipt:
    """Dispatch one generation call. Single entry point — every caller goes here."""
    if not isinstance(payload, dict):
        raise TypeError(f"dispatch payload must be a dict, got {type(payload).__name__}")
    if not isinstance(context, DispatchContext):
        raise TypeError(
            f"dispatch context must be a DispatchContext, got {type(context).__name__}"
        )

    _ensure_bootstrap(context)

    # Bug C guard (2026-05-19): validate payload shape BEFORE runner lookup so
    # the failure points at the contract violation, not at a fal 400.
    _validate_payload(modality, payload)

    runner = get_runner(modality)

    # Build A Phase 2 (2026-05-09): inject aspect_ratio from Project SSOT
    # when payload omits it. Prevents silent 9:16 downgrade for client_video
    # projects. Per SYNTHESIS lock 11.
    if "aspect_ratio" not in payload and context.project:
        try:
            from recoil.core.project import get_project
            payload = dict(payload)  # don't mutate caller's dict
            payload["aspect_ratio"] = get_project(context.project).aspect_ratio
        except Exception as e:
            logger.warning(
                "dispatch: could not inject aspect_ratio for project=%s: %s",
                context.project, e,
            )
            # Don't swallow — let runner raise MissingAspectRatioError downstream

    # Profile-flag-driven capability check (gpt-image-2 wire, 2026-05-18).
    # Future single-ref / no-transparent models inherit by setting the
    # same flags in model_profiles.json — no new code needed.
    model_id = payload.get("model")
    if model_id:
        try:
            from recoil.core.model_profiles import get_profile
            profile = get_profile(model_id)
        except Exception:
            profile = None
        if profile:
            _validate_model_constraints(model_id, payload, profile)

    started_us = int(time.time() * 1_000_000)

    shot_id = payload.get("shot_id")
    try:
        run_result: RunResult = runner.run(payload)
    except NotImplementedError as e:
        from recoil.pipeline.core.runners._shared import _failure_metadata_production
        md = _failure_metadata_production()
        md["final_state"] = "stub_not_implemented"
        run_result = RunResult(
            id=f"{shot_id or 'unknown'}_{modality}_stub",
            modality=modality,
            output_path=None,
            output_url=None,
            metadata=md,
            success=False,
            error=f"NotImplementedError: {e}",
        )

    finished_us = int(time.time() * 1_000_000)

    rid = make_receipt_id(shot_id, modality)
    base_provenance: dict[str, Any] = {
        "dispatch_path": context.caller_id,
        "started_us": started_us,
        "finished_us": finished_us,
        "wall_us": finished_us - started_us,
        "payload_keys": sorted(k for k in payload.keys() if not k.startswith("_")),
        "model": payload.get("model"),
    }
    base_provenance.update(context.provenance_overrides)

    receipt = GenerationReceipt(
        receipt_id=rid,
        modality=modality,
        caller_id=context.caller_id,
        project=context.project,
        episode=context.episode,
        shot_id=shot_id,
        timestamp_utc=utc_now_iso8601(),
        run_result=run_result,
        provenance=base_provenance,
        eval_scores={},  # CP-9 fills
    )

    _emit_receipt_jsonl(receipt, context.receipts_log_path)
    # Phase 19: feature-flagged EventBus emit; no-op without env var.
    _emit_eventbus_hook(receipt)
    return receipt


def _reset_bootstrap_for_tests() -> None:
    """Test-only helper. Clears the per-process bootstrap memo."""
    with _BOOTSTRAP_LOCK:
        _BOOTSTRAPPED_FOR.clear()


# ── CP-9 Phase 4: opt-in eval-runner registration ───────────────────────
#
# Sibling of :func:`register_default_runners` — does NOT modify it (the CP-5
# 4-modality bootstrap remains a frozen surface). Eval runners require
# GEMINI_API_KEY at the EvalNode layer, so auto-registration via dispatch()
# would break test environments without the key. Callers opt in explicitly.
def register_default_eval_runners(*, force: bool = False) -> None:
    """Register the three CP-9 eval runners. Opt-in (NOT called by ``dispatch()``).

    Sibling of :func:`register_default_runners`. Eval runners do NOT need a
    ``step_runner`` because they delegate to
    :func:`execution.providers.gemini_vision.score_artifact` directly via the
    EvalNode they wrap — hence the signature is zero-arg-other-than-``force``
    (per audit § 12c R3).

    Callers MUST also register at least one EvalNode under each runner's
    default ``judge_id`` (``eval_image_v1`` / ``eval_video_v1`` /
    ``eval_audio_v1``) — or pass an explicit ``judge_id`` in the dispatch
    payload. Production callers typically register a
    :class:`GeminiVisionEvalNode` per modality:

        from recoil.pipeline.core.runners import GeminiVisionEvalNode
        from recoil.pipeline.core import register_eval_node
        register_eval_node(
            "eval_image_v1",
            GeminiVisionEvalNode(artifact_modality="image",
                                 judge_id="eval_image_v1"),
        )

    Each call constructs FRESH runner instances. Calling
    ``register_default_eval_runners()`` twice raises ``KeyError`` on the
    second call (per :func:`pipeline.core.registry.register_runner` —
    different-instance re-registration without ``force=True`` raises).
    Pass ``force=True`` to overwrite the prior registration.
    """
    from recoil.pipeline.core.registry import (
        register_runner,
        MODALITY_EVAL_IMAGE_V1,
        MODALITY_EVAL_VIDEO_V1,
        MODALITY_EVAL_AUDIO_V1,
    )
    from recoil.pipeline.core.runners.eval_image_runner import EvalImageRunner
    from recoil.pipeline.core.runners.eval_video_runner import EvalVideoRunner
    from recoil.pipeline.core.runners.eval_audio_runner import EvalAudioRunner

    register_runner(MODALITY_EVAL_IMAGE_V1, EvalImageRunner(), force=force)
    register_runner(MODALITY_EVAL_VIDEO_V1, EvalVideoRunner(), force=force)
    register_runner(MODALITY_EVAL_AUDIO_V1, EvalAudioRunner(), force=force)
