"""VideoModelClient — the thin client atop the provider adapter registry.

Responsibilities (and ONLY these):
  1. Accept a dict-like payload from callers (today: step_runner.py).
  2. Translate dict -> UnifiedVideoPayload.
  3. Ask registry for primary adapter + tier.
  4. Drive submit / poll / result via the adapter's request descriptors.
  5. Write observability rows on every terminal event.
  6. On primary failure, try declared fallback exactly once.
  7. Return a recoil.core.jobs.Job that the ExecutionStore understands.

It does NOT know about fal.ai, atlas, or piapi specifics — that lives
in the adapters.
"""

from __future__ import annotations

import json
import logging
import os
import time
import urllib.error
import urllib.request
from typing import Optional

from recoil.core import model_profiles
from recoil.core.jobs import Job
from recoil.execution.providers import (
    PollRequest,
    PollResult,
    ProviderCapabilityError,
    SubmitRequest,
    UnifiedVideoPayload,
    resolve_adapter,
    resolve_fallback,
)
from recoil.execution.providers.payload_hints import PayloadHints
from recoil.execution.providers import observability as _obs

logger = logging.getLogger(__name__)

RECOVERY_DEADLINE_S = 1200
RECOVERY_BACKOFF_INITIAL_S = 30
RECOVERY_BACKOFF_FACTOR = 1.5
RECOVERY_BACKOFF_CAP_S = 180
_RECOVERABLE_RUNNING_STATUSES = {"IN_PROGRESS", "RUNNING", "QUEUED", "PENDING"}
_TERMINAL_FAILED_STATUSES = {"FAILED", "CANCELLED", "CANCELED"}


# Imported lazily to avoid circulars:
#   from recoil.execution.types import GenerationResult
#   from recoil.execution.lib.http_helpers import _download_video
#
# CP-2 Phase 8 — GenerationResult moved to execution.types; _download_video
# moved to execution.lib.http_helpers. ModelClient was dropped — VideoModelClient
# does not subclass it and the placeholder slot was unused.


def _lazy_imports():
    from recoil.execution.types import GenerationResult
    from recoil.execution.lib.http_helpers import _download_video

    return GenerationResult, None, _download_video


def _orphan_recovery_deadline_s() -> int:
    raw = os.environ.get("RECOIL_ORPHAN_RECOVERY_S")
    if raw is None:
        return RECOVERY_DEADLINE_S
    try:
        value = int(raw)
    except (TypeError, ValueError) as e:
        raise ValueError(
            "RECOIL_ORPHAN_RECOVERY_S must be a non-negative integer"
        ) from e
    if value < 0:
        raise ValueError(
            "RECOIL_ORPHAN_RECOVERY_S must be a non-negative integer"
        )
    return value


# ------------------------------------------------------------------
# Transport
# ------------------------------------------------------------------


def _http(
    req_method: str,
    url: str,
    headers: dict,
    body: Optional[dict] = None,
    timeout: int = 30,
) -> dict:
    data = json.dumps(body).encode() if body is not None else None
    r = urllib.request.Request(url, data=data, headers=headers, method=req_method)
    try:
        with urllib.request.urlopen(r, timeout=timeout) as resp:
            return json.loads(resp.read().decode())
    except urllib.error.HTTPError as e:
        err_body = e.read().decode() if e.fp else ""
        raise RuntimeError(f"HTTP {e.code} at {url}: {err_body}") from e


# ------------------------------------------------------------------
# Payload coercion
# ------------------------------------------------------------------


def _dict_to_unified(payload: dict, model_id: str) -> UnifiedVideoPayload:
    if not isinstance(payload, dict):
        raise TypeError(
            f"VideoModelClient: expected dict, got {type(payload).__name__}"
        )
    return UnifiedVideoPayload(
        prompt=payload.get("prompt", ""),
        duration_s=int(payload.get("duration") or 5),
        resolution=payload.get("resolution") or "720p",
        aspect_ratio=payload.get("aspect_ratio"),
        image=payload.get("image"),
        image_tail=payload.get("image_tail"),
        reference_images=list(payload.get("reference_images") or []),
        reference_videos=list(payload.get("reference_videos") or []),
        reference_audios=list(payload.get("reference_audios") or []),
        generate_audio=bool(payload.get("generate_audio", False)),
        negative_prompt=payload.get("negative_prompt"),
        shot_id=payload.get("shot_id"),
        # Phase C (T2.21/MF-10): hints can be a typed PayloadHints model,
        # a legacy dict, or None. Pass-through preserves the model for the
        # adapter; legacy dict callers get a defensive copy. Adapters call
        # `coerce_to_dict(payload.hints)` to normalize at the read boundary.
        hints=(
            payload["hints"]
            if isinstance(payload.get("hints"), PayloadHints)
            else (dict(payload["hints"]) if payload.get("hints") else None)
        ),
        model_id=model_id,
    )


# ------------------------------------------------------------------
# The client
# ------------------------------------------------------------------


class VideoModelClient:
    """Model-agnostic video client driven by the provider adapter registry.

    Callers instantiate one per model (default: "seeddance-2.0"). Internally
    each submit() resolves the primary adapter at call time (so env overrides
    take effect without rebuilding the client).
    """

    def __init__(self, model_id: str = "seeddance-2.0", tier: Optional[str] = None):
        self._model_id = model_id
        # CP-2 spec-review edit #7 — tier is ALWAYS passed as a kwarg, NOT via
        # RECOIL_PROVIDER_TIER_OVERRIDE. ProductionLoop runs phases via
        # ThreadPoolExecutor; per-thread env mutation races invalidate env-var
        # tier selection. Stored here, threaded through to resolve_adapter()
        # in submit().
        self._tier = tier
        self._verified = False

    def is_available(self) -> bool:
        # Any adapter with a key set qualifies as available.
        from recoil.execution.providers.registry import list_adapters

        for a in list_adapters():
            if self._model_id in a.supported_models:
                if os.environ.get(a.auth_env_var):
                    return True
        return False

    def submit(self, payload: dict) -> Job:
        GenerationResult, _ModelClient, _dl = _lazy_imports()
        unified = _dict_to_unified(payload, self._model_id)

        # Validate resolution against model profile BEFORE routing.
        profile = model_profiles.get_profile(self._model_id)
        supported_res = profile.get("supported_resolutions", ["720p"])
        if unified.resolution not in supported_res:
            raise ValueError(
                f"{self._model_id} does not support resolution {unified.resolution!r}; "
                f"valid: {supported_res}"
            )

        job = Job.create(model=self._model_id, client=self)

        try:
            adapter, tier = resolve_adapter(self._model_id, unified, tier=self._tier)
        except ProviderCapabilityError as e:
            job.mark_failed(str(e))
            job.result = GenerationResult(
                success=False, model=self._model_id, error=str(e)
            )
            return job

        self._submit_via(adapter, tier, unified, job)
        return job

    def _submit_via(
        self, adapter, tier: str, unified: UnifiedVideoPayload, job: Job
    ) -> None:
        GenerationResult, _ModelClient, _dl = _lazy_imports()
        start = time.time()
        try:
            req: SubmitRequest = adapter.build_submit(unified, tier)
            # CP-2 bugfix — sentinel URL interception. Wan (fal-sdk://*) and
            # Google (googleapi://*) bypass HTTP transport: the adapter exposes
            # direct_subscribe_* / direct_submit_* methods that handle the
            # entire submit (and, for Wan + Google-image, the entire job) via
            # the provider SDK. Without this branch, urllib crashes with
            # "URLError: unknown url type" on the first dispatch.
            url = req.url
            if url.startswith("fal-sdk://"):
                if "wan-i2v" in url:
                    resp = adapter.direct_subscribe_i2v(unified)
                elif "wan-r2v" in url:
                    resp = adapter.direct_subscribe_r2v(unified)
                else:
                    raise RuntimeError(
                        f"Unknown fal-sdk sentinel URL: {url}"
                    )
            elif url.startswith("googleapi://"):
                if "image-synchronous" in url:
                    resp = adapter.direct_submit_image(unified)
                elif "video-veo" in url:
                    resp = adapter.direct_submit_video(unified)
                else:
                    raise RuntimeError(
                        f"Unknown googleapi sentinel URL: {url}"
                    )
            else:
                resp = _http(
                    req.method, req.url, req.headers, req.body, timeout=60
                )
            pj = adapter.parse_submit(resp, unified, tier)
            # Stash adapter handles on the Job so wait_for_job can resume.
            job.job_id = pj.native_id
            job.status = "submitted"
            job._provider_adapter = adapter  # type: ignore[attr-defined]
            job._provider_job = pj  # type: ignore[attr-defined]
            # Legacy attributes consumed by step_runner / api_client callers:
            job._provider = adapter.provider_id  # type: ignore[attr-defined]
            job._tier = tier  # type: ignore[attr-defined]
            job._duration_s = pj.duration_s  # type: ignore[attr-defined]
            job._resolution = pj.resolution  # type: ignore[attr-defined]
            job._fal_model_path = pj.native_state.get("model_path")  # type: ignore[attr-defined]
            job._fal_status_url = pj.native_state.get("status_url")  # type: ignore[attr-defined]
            job._fal_response_url = pj.native_state.get("response_url")  # type: ignore[attr-defined]
            job._shot_id = unified.shot_id  # type: ignore[attr-defined]
            logger.info(
                "VideoModelClient: %s submitted via %s (tier=%s) -> %s",
                self._model_id,
                adapter.provider_id,
                tier,
                pj.native_id,
            )
        except Exception as e:
            err = str(e)
            latency = int((time.time() - start) * 1000)
            _obs.record_call(
                provider=getattr(adapter, "provider_id", "?"),
                model=self._model_id,
                tier=tier,
                status="SUBMIT_FAILED",
                duration_s=unified.duration_s,
                listed_cost=None,
                observed_cost=None,
                latency_ms=latency,
                task_id=None,
                shot_id=unified.shot_id,
                error=err,
            )
            logger.error(
                "VideoModelClient submit via %s failed: %s",
                getattr(adapter, "provider_id", "?"),
                err,
            )
            job.mark_failed(err)
            job.result = GenerationResult(
                success=False, model=self._model_id, error=err
            )

    def wait_for_job(self, job: Job, timeout_s: int = 600, on_status=None):
        GenerationResult, _ModelClient, _dl = _lazy_imports()
        if job.result and not job.result.success:
            return job.result

        adapter = getattr(job, "_provider_adapter", None)
        pj = getattr(job, "_provider_job", None)
        if adapter is None or pj is None:
            err = "wait_for_job called on a job with no adapter bound"
            job.mark_failed(err)
            job.result = GenerationResult(
                success=False, model=self._model_id, error=err
            )
            return job.result

        start = time.time()
        interval = 5
        last_status = None
        last_result = None
        last_poll_exception = None
        poll_count = 0
        while time.time() - start < timeout_s:
            poll_count += 1
            poll: PollRequest = adapter.build_poll(pj)
            try:
                # CP-2 bugfix — sentinel poll URLs signal that the job is
                # already in-hand (Wan blocking-subscribe / Google synchronous
                # image). Skip the HTTP call and let parse_poll return
                # COMPLETED based on native_state. Without this, urllib
                # crashes with "URLError: unknown url type" on the first poll.
                if poll.url.startswith("fal-sdk://") or poll.url.startswith(
                    "googleapi://"
                ):
                    resp = {}
                else:
                    resp = _http(
                        poll.method, poll.url, poll.headers, None, timeout=30
                    )
                result = adapter.parse_poll(resp, pj)
            except Exception as e:
                last_poll_exception = e
                logger.warning("VideoModelClient poll error (%d): %s", poll_count, e)
                time.sleep(interval)
                interval = min(interval * 1.5, 60)
                continue

            last_poll_exception = None
            last_result = result
            if on_status and result.status != last_status:
                try:
                    on_status(result.status)
                except Exception:
                    pass
                last_status = result.status

            if result.status == "COMPLETED":
                if self._should_recover_completed_no_output(adapter, pj, result):
                    return self._recover_orphaned_run(
                        adapter,
                        pj,
                        job,
                        submit_start=start,
                        on_status=on_status,
                        last_status=last_status,
                        initial_result=result,
                    )
                return self._finalize_completed(
                    adapter, pj, job, result, submit_start=start
                )
            if result.status == "FAILED" or (
                result.status in _TERMINAL_FAILED_STATUSES
                and self._is_flora_recovery_candidate(adapter, pj)
            ):
                if self._should_recover_failed_poll(adapter, pj, result):
                    return self._recover_orphaned_run(
                        adapter,
                        pj,
                        job,
                        submit_start=start,
                        on_status=on_status,
                        last_status=last_status,
                        initial_result=result,
                    )
                return self._finalize_failed(
                    adapter, pj, job, result, submit_start=start
                )

            time.sleep(interval)
            interval = min(interval * 1.5, 60)

        if self._should_recover_timeout(
            adapter, pj, last_result, last_poll_exception
        ):
            return self._recover_orphaned_run(
                adapter,
                pj,
                job,
                submit_start=start,
                on_status=on_status,
                last_status=last_status,
                initial_result=last_result,
            )

        # Timeout — log, attempt best-effort cancel, and record observability.
        profile = model_profiles.get_profile(self._model_id)
        listed_cost = adapter.compute_cost(pj.duration_s, pj.tier, profile)
        observed_cost = None
        native_state = getattr(pj, "native_state", {}) or {}
        charged_cost = native_state.get("charged_cost")
        if charged_cost is not None:
            try:
                observed_cost = float(charged_cost)
            except (TypeError, ValueError):
                observed_cost = None

        native_id = getattr(pj, "native_id", None)
        model_path = native_state.get("model_path")
        response_url = native_state.get("response_url")
        cancel_fn = getattr(getattr(adapter, "transport", None), "cancel", None)
        if callable(cancel_fn) and model_path and native_id:
            try:
                cancel_fn(model_path, native_id)
            except Exception as e:
                logger.warning(
                    "VideoModelClient timeout cancel failed "
                    "(provider=%s, model_path=%s, native_id=%s): %s",
                    getattr(adapter, "provider_id", "?"),
                    model_path,
                    native_id,
                    e,
                )

        err = (
            f"{self._model_id} job {job.job_id} timed out after {timeout_s}s "
            f"via {adapter.provider_id} "
            f"(native_id={native_id}, response_url={response_url})"
        )
        _obs.record_call(
            provider=adapter.provider_id,
            model=self._model_id,
            tier=pj.tier,
            status="TIMEOUT",
            duration_s=pj.duration_s,
            listed_cost=listed_cost,
            observed_cost=observed_cost,
            latency_ms=int((time.time() - start) * 1000),
            task_id=native_id,
            shot_id=getattr(job, "_shot_id", None),
            error=err,
        )
        result_cost = (
            observed_cost
            if observed_cost is not None and observed_cost > 0
            else listed_cost
        )
        job.mark_failed(err)
        job.result = GenerationResult(
            success=False,
            model=self._model_id,
            error=err,
            cost=result_cost,
        )
        return job.result

    def _is_flora_recovery_candidate(self, adapter, pj) -> bool:
        return (
            getattr(adapter, "provider_id", None) == "flora"
            and bool(getattr(pj, "native_id", None))
        )

    def _flora_poll_has_provider_error(self, poll_result) -> bool:
        raw = getattr(poll_result, "raw", None) or {}
        if not isinstance(raw, dict):
            raw = {}
        return bool(
            getattr(poll_result, "error", None)
            or raw.get("error")
            or raw.get("error_message")
            or raw.get("error_code")
        )

    def _flora_raw_status(self, poll_result) -> str:
        raw = getattr(poll_result, "raw", None) or {}
        if not isinstance(raw, dict):
            return ""
        return str(raw.get("status") or "").lower()

    def _should_recover_failed_poll(self, adapter, pj, poll_result) -> bool:
        if not self._is_flora_recovery_candidate(adapter, pj):
            return False
        if getattr(poll_result, "status", None) != "FAILED":
            return False
        if self._flora_poll_has_provider_error(poll_result):
            return False
        if self._flora_raw_status(poll_result) in {"failed", "cancelled", "canceled"}:
            return False
        return _orphan_recovery_deadline_s() > 0

    def _should_recover_completed_no_output(self, adapter, pj, poll_result) -> bool:
        if not self._is_flora_recovery_candidate(adapter, pj):
            return False
        if getattr(poll_result, "status", None) != "COMPLETED":
            return False
        if getattr(poll_result, "video_url", None):
            return False
        return _orphan_recovery_deadline_s() > 0

    def _should_recover_timeout(
        self, adapter, pj, last_result, last_poll_exception
    ) -> bool:
        if not self._is_flora_recovery_candidate(adapter, pj):
            return False
        if last_poll_exception is not None:
            return _orphan_recovery_deadline_s() > 0
        if last_result is None:
            return False
        status = str(getattr(last_result, "status", "") or "").upper()
        if status in _RECOVERABLE_RUNNING_STATUSES:
            return _orphan_recovery_deadline_s() > 0
        return False

    def _poll_provider_job(self, adapter, pj):
        poll: PollRequest = adapter.build_poll(pj)
        if poll.url.startswith("fal-sdk://") or poll.url.startswith("googleapi://"):
            resp = {}
        else:
            resp = _http(poll.method, poll.url, poll.headers, None, timeout=30)
        return adapter.parse_poll(resp, pj)

    def _sleep_until_recovery_deadline(self, recovery_start, deadline_s, interval):
        remaining_s = deadline_s - (time.time() - recovery_start)
        if remaining_s > 0:
            time.sleep(min(interval, remaining_s))

    def _recover_orphaned_run(
        self,
        adapter,
        pj,
        job,
        *,
        submit_start,
        on_status=None,
        last_status=None,
        initial_result=None,
    ):
        deadline_s = _orphan_recovery_deadline_s()
        recovery_start = time.time()
        interval = RECOVERY_BACKOFF_INITIAL_S
        last_result = initial_result
        completed_no_output = (
            getattr(initial_result, "status", None) == "COMPLETED"
            and not getattr(initial_result, "video_url", None)
        )

        while time.time() - recovery_start < deadline_s:
            try:
                result = self._poll_provider_job(adapter, pj)
            except Exception as e:
                logger.warning(
                    "VideoModelClient recovery poll error "
                    "(provider=%s, native_id=%s): %s",
                    getattr(adapter, "provider_id", "?"),
                    getattr(pj, "native_id", None),
                    e,
                )
                self._sleep_until_recovery_deadline(
                    recovery_start, deadline_s, interval
                )
                interval = min(
                    interval * RECOVERY_BACKOFF_FACTOR, RECOVERY_BACKOFF_CAP_S
                )
                continue

            last_result = result
            if on_status and result.status != last_status:
                try:
                    on_status(result.status)
                except Exception:
                    pass
                last_status = result.status

            if result.status == "COMPLETED":
                if result.video_url:
                    logger.warning(
                        "recovered orphaned run %s after %ds",
                        getattr(pj, "native_id", None),
                        int(time.time() - recovery_start),
                    )
                    return self._finalize_completed(
                        adapter, pj, job, result, submit_start=submit_start
                    )
                completed_no_output = True
            elif result.status in _TERMINAL_FAILED_STATUSES:
                if result.status == "FAILED" and self._should_recover_failed_poll(
                    adapter, pj, result
                ):
                    self._sleep_until_recovery_deadline(
                        recovery_start, deadline_s, interval
                    )
                    interval = min(
                        interval * RECOVERY_BACKOFF_FACTOR, RECOVERY_BACKOFF_CAP_S
                    )
                    continue
                return self._finalize_failed(
                    adapter, pj, job, result, submit_start=submit_start
                )

            self._sleep_until_recovery_deadline(recovery_start, deadline_s, interval)
            interval = min(
                interval * RECOVERY_BACKOFF_FACTOR, RECOVERY_BACKOFF_CAP_S
            )

        if completed_no_output:
            return self._finalize_completed_no_output(
                adapter, pj, job, last_result, submit_start
            )
        return self._finalize_orphaned(adapter, pj, job, submit_start, deadline_s)

    def _poll_url_for_orphan(self, adapter, pj) -> Optional[str]:
        native_state = getattr(pj, "native_state", {}) or {}
        poll_url = native_state.get("poll_url")
        if poll_url:
            return poll_url
        try:
            return adapter.build_poll(pj).url
        except Exception:
            return None

    def _submit_costs(self, adapter, pj):
        profile = model_profiles.get_profile(self._model_id)
        listed_cost = adapter.compute_cost(pj.duration_s, pj.tier, profile)
        observed_cost = None
        native_state = getattr(pj, "native_state", {}) or {}
        charged_cost = native_state.get("charged_cost")
        if charged_cost is not None:
            try:
                observed_cost = float(charged_cost)
            except (TypeError, ValueError):
                observed_cost = None
        return listed_cost, observed_cost

    def _finalize_orphaned(self, adapter, pj, job, submit_start, deadline_s):
        GenerationResult, _ModelClient, _dl = _lazy_imports()
        listed_cost, observed_cost = self._submit_costs(adapter, pj)
        native_id = getattr(pj, "native_id", None)
        poll_url = self._poll_url_for_orphan(adapter, pj)
        err = (
            f"{self._model_id} job {job.job_id} orphaned after recovery "
            f"deadline {deadline_s}s via {adapter.provider_id} "
            f"(native_id={native_id}, poll_url={poll_url}; "
            "recoverable: job may still complete server-side)"
        )
        _obs.record_call(
            provider=adapter.provider_id,
            model=self._model_id,
            tier=pj.tier,
            status="ORPHANED",
            duration_s=pj.duration_s,
            listed_cost=listed_cost,
            observed_cost=observed_cost,
            latency_ms=int((time.time() - submit_start) * 1000),
            task_id=native_id,
            shot_id=getattr(job, "_shot_id", None),
            error=err,
        )
        result_cost = (
            observed_cost
            if observed_cost is not None and observed_cost > 0
            else listed_cost
        )
        job.mark_failed(err)
        job.result = GenerationResult(
            success=False,
            model=self._model_id,
            error=err,
            cost=result_cost,
        )
        return job.result

    def _finalize_completed_no_output(
        self, adapter, pj, job, poll_result, submit_start
    ):
        error = (
            f"completed_no_output: provider reported COMPLETED without an output "
            f"URL (native_id={getattr(pj, 'native_id', None)}, "
            f"poll_url={self._poll_url_for_orphan(adapter, pj)})"
        )
        failed_poll = PollResult(
            status="FAILED",
            observed_cost=getattr(poll_result, "observed_cost", None),
            error=error,
            raw=getattr(poll_result, "raw", {}) or {},
        )
        return self._finalize_failed(
            adapter, pj, job, failed_poll, submit_start=submit_start
        )

    def _finalize_completed(self, adapter, pj, job, poll_result, submit_start):
        GenerationResult, _ModelClient, _dl = _lazy_imports()
        # Some adapters (atlas, piapi) return the result in the poll payload.
        # Others (fal) require a separate fetch.
        if poll_result.video_url:
            final = poll_result
        else:
            fetch = adapter.build_result_fetch(pj)
            if fetch is None:
                final = adapter.parse_result(poll_result.raw, pj)
            else:
                fetch_resp = _http(
                    fetch.method, fetch.url, fetch.headers, None, timeout=30
                )
                final = adapter.parse_result(fetch_resp, pj)
        if getattr(final, "status", None) == "FAILED":
            return self._finalize_failed(
                adapter, pj, job, final, submit_start=submit_start
            )

        profile = model_profiles.get_profile(self._model_id)
        listed_cost = adapter.compute_cost(pj.duration_s, pj.tier, profile)
        video_bytes = _dl(final.video_url) if final.video_url else None

        latency = int((time.time() - submit_start) * 1000)
        _obs.record_call(
            provider=adapter.provider_id,
            model=self._model_id,
            tier=pj.tier,
            status="COMPLETED",
            duration_s=pj.duration_s,
            listed_cost=listed_cost,
            observed_cost=final.observed_cost,
            latency_ms=latency,
            task_id=pj.native_id,
            shot_id=getattr(job, "_shot_id", None),
            error=None,
        )

        metadata = {
            "provider": adapter.provider_id,
            "tier": pj.tier,
            "request_id": pj.native_id,
            "model_path": pj.native_state.get("model_path"),
            "duration_s": pj.duration_s,
            "resolution": pj.resolution,
            "cost_per_second_estimated": listed_cost / max(pj.duration_s, 1),
            "cost_usd_estimated": listed_cost,
            "cost_usd_billed": final.observed_cost,
        }
        if final.audio_url:
            metadata["audio_url"] = final.audio_url

        # Phase 7 (Bug L): capture provider-side seed + billing fields from the
        # raw fal response. The fal SDK returns `seed` on the final result
        # payload for seeddance-2.0 (verified 2026-05-20 against pipeline-
        # learnings §22). `inference_billed_usd` is fal's billed-cost field
        # (distinct from `cost_usd_estimated` we compute from the rate card).
        # Previously dropped on the floor; now threaded into sidecar provenance
        # via run_result.metadata.seed.
        raw_resp = getattr(final, "raw", None)
        if isinstance(raw_resp, dict):
            if raw_resp.get("seed") is not None:
                metadata["seed"] = raw_resp.get("seed")
            if raw_resp.get("request_id") is not None:
                # Prefer fal's echoed request_id if present; otherwise pj.native_id.
                metadata["request_id"] = raw_resp.get("request_id")
            inf_billed = raw_resp.get("inference_billed_usd")
            if inf_billed is not None:
                metadata["inference_billed_usd"] = inf_billed

        result = GenerationResult(
            success=True,
            video_data=video_bytes,
            video_url=final.video_url,
            model=self._model_id,
            cost=listed_cost,
            metadata=metadata,
        )
        job.mark_complete(result)
        self._verified = True
        return result

    def _finalize_failed(self, adapter, pj, job, poll_result, submit_start):
        GenerationResult, _ModelClient, _dl = _lazy_imports()
        status = getattr(poll_result, "status", None) or "FAILED"
        err = poll_result.error or f"provider reported {status}"
        latency = int((time.time() - submit_start) * 1000)
        _obs.record_call(
            provider=adapter.provider_id,
            model=self._model_id,
            tier=pj.tier,
            status="FAILED",
            duration_s=pj.duration_s,
            listed_cost=None,
            observed_cost=None,
            latency_ms=latency,
            task_id=pj.native_id,
            shot_id=getattr(job, "_shot_id", None),
            error=err,
        )

        # Fallback attempt (one shot).
        unified = UnifiedVideoPayload(
            prompt="",  # must be reconstructed by caller-path on legacy .generate()
            duration_s=pj.duration_s,
            resolution=pj.resolution,
            model_id=self._model_id,
        )
        fb = resolve_fallback(self._model_id, unified)
        if fb is not None:
            fb_adapter, fb_tier = fb
            # The inline retry was never implemented (see note below) — the
            # old "retrying on fallback" wording claimed a dispatch that
            # never happened and sent the 2026-06-09 EP001 debugging to fal
            # for requests that were never made (REC-122).
            logger.warning(
                "Primary %s failed (%s) — fallback %s/%s is configured but "
                "inline retry is NOT implemented; failing this attempt",
                adapter.provider_id,
                err,
                fb_adapter.provider_id,
                fb_tier,
            )
            # Note: the caller's original unified payload isn't preserved here.
            # This is intentional — fallback retries must be orchestrated by
            # the caller re-invoking submit() with the original dict payload
            # after catching the failed result. The inline one-shot retry is a
            # best-effort for pure-text prompts; rich-input retries need the
            # original payload plumbed through. For V1, prefer caller-level
            # retry. See provider-adapter-refactor/SYNTHESIS.md §5.

        # Thread the provider's reported billed cost through — under the
        # REC-122 rule failed results report only provider-confirmed cost,
        # so dropping observed_cost here would undercount real spend
        # (gate-2 finding).
        observed = float(getattr(poll_result, "observed_cost", 0.0) or 0.0)
        result = GenerationResult(
            success=False,
            model=self._model_id,
            error=err,
            cost=observed,
        )
        job.mark_failed(err)
        job.result = result
        return result

    def reconnect_job(self, job_dict: dict) -> Job:
        job = Job.from_dict(job_dict)
        job._client = self
        return job

    def generate(self, package):
        if isinstance(package, dict):
            payload = package
        else:
            payload = {"prompt": str(package)}
        job = self.submit(payload)
        return self.wait_for_job(job)


__all__ = ["VideoModelClient"]
