"""Kling adapter — kling-o3 (elements R2V), kling-v3 (start+end I2V/T2V),
kling-v3-i2v (start-frame I2V), O3/O1 video-to-video edit.

Extracted from FalAiKlingClient (recoil/execution/api_client.py:919-1352)
in CP-2 Phase 3b of the June 2026 refactor. Preserves exact wire behavior:
  - kling-o3 reference-to-video: elements payload + start_image_url
  - kling-v3 image-to-video: image_url (V3) or start_image_url (O3)
  - kling-v3-i2v: I2V variant with start frame only
  - end-frame: end_image_url (NOT tail_image_url - fal.ai change 2026-04-05)
  - audio: generate_audio=True
  - v2v edit (O3/O1): hints["endpoint"]="o3_edit_pro", reference_videos[0]=src, reference_images=refs

Auth: FAL_KEY environment variable (composed via FalTransport).

Implementation note: this adapter COMPOSES providers/fal_transport.py for
queue auth, submit, poll, result fetch, and per-instance upload caching.
Per Q3 spec-review lock (2026-04-25), Wan + Seedance + Kling all share
this transport - adapter classes carry only the per-model translation.
"""

from __future__ import annotations

import base64
import json
import logging
import os
from typing import Optional

from recoil.execution.providers.base import (
    PollRequest,
    PollResult,
    ProviderJob,
    SubmitRequest,
    UnifiedVideoPayload,
)
from recoil.execution.providers.fal_transport import FalTransport  # Q3 lock - composition
from recoil.execution.providers.payload_hints import coerce_to_dict

logger = logging.getLogger(__name__)


# Endpoint table - verbatim from api_client.py:945-953 (FalAiKlingClient.ENDPOINTS)
_ENDPOINTS = {
    "i2v_standard": "fal-ai/kling-video/v3/standard/image-to-video",
    "i2v_pro": "fal-ai/kling-video/v3/pro/image-to-video",
    "t2v_standard": "fal-ai/kling-video/v3/standard/text-to-video",
    "o3_i2v_standard": "fal-ai/kling-video/o3/standard/image-to-video",
    "o3_ref_standard": "fal-ai/kling-video/o3/standard/reference-to-video",
    "o3_ref_pro": "fal-ai/kling-video/o3/pro/reference-to-video",
    # Video-to-video edit endpoints (O3 = Kling 3.0 Omni, O1 = older Omni)
    "o3_edit_standard": "fal-ai/kling-video/o3/standard/video-to-video/edit",
    "o3_edit_pro": "fal-ai/kling-video/o3/pro/video-to-video/edit",
    "o1_edit_standard": "fal-ai/kling-video/o1/standard/video-to-video/edit",
}


def _resolve_endpoint(payload: UnifiedVideoPayload) -> str:
    """Determine the fal.ai model path from payload + model_id.

    Cite: api_client.py:985-1005 for the original logic. Adapted here to
    read from UnifiedVideoPayload + payload.hints rather than a dict.
    """
    hints = coerce_to_dict(payload.hints)
    explicit = hints.get("endpoint")
    if explicit:
        return _ENDPOINTS.get(explicit, explicit)

    mode = hints.get("mode", "standard")
    has_elements = bool(hints.get("elements"))
    has_image = payload.image is not None or hints.get("image_url")

    # kling-o3 with elements -> reference-to-video
    if payload.model_id == "kling-o3" and has_elements:
        return _ENDPOINTS["o3_ref_pro" if mode == "pro" else "o3_ref_standard"]
    # kling-o3 without elements but with image -> O3 i2v
    if payload.model_id == "kling-o3" and has_image:
        return _ENDPOINTS["o3_i2v_standard"]
    # kling-v3 / kling-v3-i2v with image -> i2v
    if has_image:
        return _ENDPOINTS["i2v_pro" if mode == "pro" else "i2v_standard"]
    # otherwise text-to-video
    return _ENDPOINTS["t2v_standard"]


def _frame_key_for_endpoint(model_path: str) -> str:
    """O3 reference endpoints use start_image_url; V3 I2V uses image_url.

    Cite: api_client.py:1043-1060.
    """
    return "start_image_url" if "o3_ref" in model_path else "image_url"


class KlingAdapter:
    provider_id = "kling"
    supported_models = ["kling-o3", "kling-v3", "kling-v3-i2v"]
    auth_env_var = "FAL_KEY"
    base_url = "https://queue.fal.run"
    max_prompt_chars = None
    status = "primary"
    capabilities = {
        "t2v": True,  # kling-v3 t2v_standard
        "i2v": True,  # all three models
        "r2v": True,  # kling-o3 reference-to-video (elements)
        "end_frame": True,  # kling-v3 supports end_image_url
        "audio": True,  # generate_audio
        "negative_prompt": True,
        "resolution_480p": False,
        "resolution_720p": True,
        "resolution_1080p": False,
        "v2v_edit": True,  # O3/O1 video-to-video edit
    }

    def __init__(self):
        # Per-instance transport: holds the FAL_KEY auth + upload cache.
        # Registry caches the adapter, so this transport (and its cache)
        # survive across submits in one session.
        self._transport = FalTransport(auth_env_var=self.auth_env_var)
        self.transport = self._transport

    # ---- submit ----

    def build_submit(self, payload: UnifiedVideoPayload, tier: str) -> SubmitRequest:
        model_path = _resolve_endpoint(payload)
        hints = coerce_to_dict(payload.hints)

        # Video-to-video edit mode — different wire shape from I2V/T2V.
        if "video-to-video/edit" in model_path:
            body = self._build_edit_body(payload, hints)
        else:
            if payload.reference_images and not hints.get("elements"):
                raise ValueError(
                    "kling generation: reference_images require hints['elements']; "
                    "non-edit endpoints would drop reference_images"
                )
            body = self._build_generation_body(payload, model_path, hints)

        url = f"{self.base_url}/{model_path}"
        headers = self._transport.headers()
        return SubmitRequest(method="POST", url=url, headers=headers, body=body)

    def _build_edit_body(self, payload: UnifiedVideoPayload, hints: dict) -> dict:
        """Wire body for video-to-video edit endpoints (O1/O3).

        Required: video_url (from reference_videos[0] or hints["video_url"]).
        Optional: image_urls (reference images), keep_audio.
        No duration or aspect_ratio — the model preserves the source video's shape.
        """
        video_url = hints.get("video_url")
        if not video_url and payload.reference_videos:
            src = payload.reference_videos[0]
            if isinstance(src, str) and not src.startswith("http"):
                video_url = self._transport.upload_path(src)
            else:
                video_url = src
        if not video_url:
            raise ValueError(
                "kling edit: no source video — pass reference_videos[0] or hints['video_url']"
            )

        body: dict = {
            "prompt": payload.prompt or "",
            "video_url": video_url,
        }

        if payload.reference_images:
            image_urls = []
            for ref in payload.reference_images:
                if isinstance(ref, str) and not ref.startswith("http"):
                    image_urls.append(self._transport.upload_path(ref))
                else:
                    image_urls.append(ref)
            body["image_urls"] = image_urls

        if hints.get("keep_audio") is not None:
            body["keep_audio"] = bool(hints["keep_audio"])

        return body

    def _build_generation_body(
        self, payload: UnifiedVideoPayload, model_path: str, hints: dict
    ) -> dict:
        """Wire body for I2V / T2V / R2V generation endpoints."""
        frame_key = _frame_key_for_endpoint(model_path)
        body: dict = {
            "prompt": payload.prompt or "",
            "duration": str(payload.duration_s),
            "aspect_ratio": payload.aspect_ratio or "9:16",
        }
        if payload.negative_prompt:
            body["negative_prompt"] = payload.negative_prompt
        if payload.generate_audio:
            body["generate_audio"] = True

        # Start frame - base64 -> data URI (cite api_client.py:1049-1060).
        if payload.image is not None:
            img = payload.image
            if isinstance(img, (bytes, bytearray)):
                b64 = base64.b64encode(bytes(img)).decode()
            elif isinstance(img, str):
                b64 = img  # already base64
            else:
                b64 = None
            if b64:
                body[frame_key] = f"data:image/png;base64,{b64}"

        # End frame - end_image_url (cite api_client.py:1062-1073)
        if payload.image_tail is not None:
            tail = payload.image_tail
            if isinstance(tail, (bytes, bytearray)):
                tail_b64 = base64.b64encode(bytes(tail)).decode()
                body["end_image_url"] = f"data:image/png;base64,{tail_b64}"
            elif isinstance(tail, str):
                if tail.startswith("data:"):
                    body["end_image_url"] = tail
                else:
                    body["end_image_url"] = f"data:image/png;base64,{tail}"

        # Elements (kling-o3 reference-to-video)
        elements = hints.get("elements")
        if elements:
            body["elements"] = elements

        return body

    def parse_submit(
        self, resp: dict, payload: UnifiedVideoPayload, tier: str
    ) -> ProviderJob:
        request_id = resp.get("request_id")
        if not request_id:
            raise RuntimeError(
                f"kling adapter: no request_id in submit response: "
                f"{resp.get('detail', resp)}"
            )
        model_path = _resolve_endpoint(payload)
        return ProviderJob(
            provider_id=self.provider_id,
            model_id=payload.model_id,
            native_id=request_id,
            tier=tier,
            duration_s=payload.duration_s,
            resolution=payload.resolution,
            native_state={
                "model_path": model_path,
                "status_url": resp.get("status_url")
                or f"{self.base_url}/{model_path}/requests/{request_id}/status",
                "response_url": resp.get("response_url")
                or f"{self.base_url}/{model_path}/requests/{request_id}",
            },
        )

    # ---- poll ----

    def build_poll(self, job: ProviderJob) -> PollRequest:
        return PollRequest(
            method="GET",
            url=job.native_state["status_url"],
            headers=self._transport.headers(),
        )

    def parse_poll(self, resp: dict, job: ProviderJob) -> PollResult:
        status = (resp.get("status") or "unknown").upper()
        if status == "COMPLETED":
            return PollResult(status="COMPLETED", raw=resp)
        if status == "FAILED":
            return PollResult(
                status="FAILED",
                error=resp.get("error") or "fal.ai Kling generation failed",
                raw=resp,
            )
        return PollResult(status="IN_PROGRESS", raw=resp)

    # ---- result fetch ----

    def build_result_fetch(self, job: ProviderJob) -> Optional[PollRequest]:
        return PollRequest(
            method="GET",
            url=job.native_state["response_url"],
            headers=self._transport.headers(),
        )

    def parse_result(self, resp: dict, job: ProviderJob) -> PollResult:
        video = resp.get("video") or {}
        audio = resp.get("audio") or {}
        video_url = video.get("url")
        if not video_url:
            return PollResult(
                status="FAILED",
                error=(
                    "provider returned COMPLETED with no video_url: "
                    + json.dumps(resp, default=str, sort_keys=True)[:500]
                ),
                raw=resp,
            )
        return PollResult(
            status="COMPLETED",
            video_url=video_url,
            audio_url=audio.get("url"),
            observed_cost=None,
            raw=resp,
        )

    # ---- cost ----

    def compute_cost(self, duration_s: float, tier: str, profile: dict) -> float:
        # Cite: model_profiles.json kling-o3 / kling-v3 entries.
        # Tier-aware: standard vs professional, and audio-on vs audio-off for o3.
        rate = (profile or {}).get("cost_per_second")
        if rate is None:
            rate = 0.126  # legacy fallback (matches kling-v3 standard)
        return float(rate) * float(duration_s)


ADAPTER = KlingAdapter()


# ======================================================================
# Backwards-compat shims — CP-2 Phase 8 (spec-review edit #11, locked
# 2026-04-25). The Production Console (review_server.py:7297, 7310) and
# pipeline/api/routes/generation.py (lines 2243, 2254) import KlingClient
# and FalAiKlingClient by class name and call __init__() + is_available()
# before delegating real work to StepRunner. Until those callsites
# migrate (CP-3+), keep the legacy class names reachable.
#
# DO NOT add real generation logic here — all generation flows through
# KlingAdapter / VideoModelClient now.
# ======================================================================


class KlingClient:
    """Legacy class name for the direct Kling REST client.

    Pre-CP-2 KlingClient hit api.klingai.com directly using JWT auth
    derived from KLING_ACCESS_KEY + KLING_SECRET_KEY. Today the canonical
    Kling path runs through fal.ai via KlingAdapter; this shim survives
    only so the Production Console's "kling-v3-direct" / "kling-o3-direct"
    branches can probe availability without crashing the import.
    """

    def __init__(
        self,
        access_key: Optional[str] = None,
        secret_key: Optional[str] = None,
    ):
        self._access_key = access_key or os.environ.get("KLING_ACCESS_KEY")
        self._secret_key = secret_key or os.environ.get("KLING_SECRET_KEY")

    def is_available(self) -> bool:
        """Available iff both access key and secret key are configured."""
        return bool(
            (self._access_key or os.environ.get("KLING_ACCESS_KEY"))
            and (self._secret_key or os.environ.get("KLING_SECRET_KEY"))
        )


class FalAiKlingClient:
    """Legacy class name for the fal.ai-proxied Kling client.

    Pre-CP-2 FalAiKlingClient submitted to queue.fal.run with FAL_KEY
    auth. Today the canonical path is KlingAdapter via VideoModelClient.
    This shim survives so review_server.py / generation.py can call
    is_available() before kicking off StepRunner.
    """

    def __init__(self, api_key: Optional[str] = None):
        self._api_key = api_key or os.environ.get("FAL_KEY")

    def is_available(self) -> bool:
        """Available iff FAL_KEY is configured."""
        return bool(self._api_key or os.environ.get("FAL_KEY"))


__all__ = ["KlingAdapter", "ADAPTER", "KlingClient", "FalAiKlingClient"]

__all__ = ["KlingAdapter", "ADAPTER"]
