"""Google Gemini adapter — NBP, Flash, Veo 3.1.

Extracted from GoogleGenaiClient (recoil/execution/api_client.py) in CP-2
of the June 2026 refactor. Preserves exact wire behavior:
  - NBP / Flash (image): synchronous client.models.generate_content
  - Veo 3.1 (video): async generate_videos with polling

Auth: GEMINI_API_KEY environment variable.

Special-case: Google's submit/poll model is different from fal/atlas/piapi.
Image generation is synchronous — the adapter's build_submit() MUST signal
completion to the registry path. The provider-aware path in step_runner
(Phase 6) handles this by checking job.status == "completed" right after
submit and skipping the poll loop.
"""

from __future__ import annotations

import base64
import logging
import os
from typing import Optional

from recoil.execution.providers.base import (
    PollRequest,
    PollResult,
    ProviderJob,
    SubmitRequest,
    UnifiedVideoPayload,
)
from recoil.execution.providers.payload_hints import coerce_to_dict

logger = logging.getLogger(__name__)

try:
    from google import genai
    from google.genai import types as genai_types

    _HAS_GENAI = True
except ImportError:
    genai = None
    genai_types = None
    _HAS_GENAI = False


_VEO_API_MODELS = {"veo-3.1": "veo-3.1-generate-preview"}
_VEO_VALID_DURATIONS = (4, 6, 8)


class GoogleAdapter:
    provider_id = "google"
    # NBP & Flash for keyframes/previz; Veo 3.1 for video. Veo 3 deprioritized
    # 2026-04-09 per JT memory note — keep capability but warn.
    supported_models = ["nbp", "flash", "veo-3.1"]
    auth_env_var = "GEMINI_API_KEY"
    base_url = "https://generativelanguage.googleapis.com"  # informational only
    max_prompt_chars = None
    status = "primary"
    capabilities = {
        "t2v": True,  # Veo 3.1 text-to-video
        "i2v": True,  # Veo 3.1 image-to-video
        "r2v": False,  # not supported
        "end_frame": False,  # Veo 3.1 doesn't accept end frame
        "audio": True,  # Veo can generate audio
        "negative_prompt": False,
        "resolution_480p": False,
        "resolution_720p": True,
        "resolution_1080p": False,
    }

    # Image-generation behaviors are NOT covered by the video-shaped capability
    # keys above. Image use is signaled by payload.hints["modality"] == "image"
    # and dispatched via the `image_synchronous_submit` branch below.

    def __init__(self):
        self._client = None

    def _get_client(self):
        if self._client is None:
            if not _HAS_GENAI:
                raise RuntimeError(
                    "google-genai SDK not installed. pip install google-genai"
                )
            api_key = os.environ.get(self.auth_env_var)
            if not api_key:
                raise RuntimeError(f"{self.auth_env_var} not set in env.")
            self._client = genai.Client(api_key=api_key)
        return self._client

    # ---- submit ----

    def build_submit(self, payload: UnifiedVideoPayload, tier: str) -> SubmitRequest:
        """Build the request descriptor.

        Google's API doesn't fit the POST-then-poll shape cleanly. We
        return a SubmitRequest with a sentinel URL that the
        VideoModelClient recognizes (or, more cleanly, the StepRunner
        executor calls a different code path that the adapter exposes
        directly — see _direct_submit). For Phase 2 we ALSO expose a
        direct-submit method that the executor in Phase 6 will use.
        """
        modality = coerce_to_dict(payload.hints).get("modality", "video")
        if modality == "image":
            # Image jobs short-circuit through _direct_submit.
            return SubmitRequest(
                method="POST",
                url="googleapi://image-synchronous",  # sentinel
                headers={},
                body={
                    "prompt": payload.prompt,
                    "model": payload.model_id,
                    **coerce_to_dict(payload.hints),
                },
            )
        # Video jobs (Veo 3.1)
        if payload.model_id not in _VEO_API_MODELS:
            raise ValueError(
                f"GoogleAdapter cannot dispatch video for model {payload.model_id}"
            )
        if payload.duration_s not in _VEO_VALID_DURATIONS:
            raise ValueError(
                f"Veo 3.1 only accepts durations {_VEO_VALID_DURATIONS}; got {payload.duration_s}"
            )
        return SubmitRequest(
            method="POST",
            url="googleapi://video-veo",  # sentinel
            headers={},
            body={
                "prompt": payload.prompt,
                "model": _VEO_API_MODELS[payload.model_id],
                "duration": payload.duration_s,
                "aspect_ratio": payload.aspect_ratio,
                "image": payload.image,  # raw bytes, file path, or base64
                "image_tail": payload.image_tail,
                "generate_audio": payload.generate_audio,
            },
        )

    def parse_submit(
        self, resp: dict, payload: UnifiedVideoPayload, tier: str
    ) -> ProviderJob:
        # The "resp" passed in is the result of _direct_submit (see below);
        # the registry/VideoModelClient flow won't use this for google because
        # the executor calls direct_submit / direct_wait instead.
        native_id = resp.get("native_id") or resp.get("operation_name") or "unknown"
        return ProviderJob(
            provider_id=self.provider_id,
            model_id=payload.model_id,
            native_id=native_id,
            tier=tier,
            duration_s=payload.duration_s,
            resolution=payload.resolution,
            native_state={
                "modality": coerce_to_dict(payload.hints).get("modality", "video"),
                "operation": resp.get("operation"),
                "image_bytes": resp.get("image_bytes"),  # synchronous image result
                "video_url": resp.get("video_url"),
            },
        )

    # ---- poll ----

    def build_poll(self, job: ProviderJob) -> PollRequest:
        # Image jobs are already complete at submit; poll is a no-op.
        # Video jobs poll via the SDK, NOT via HTTP. The executor handles this.
        return PollRequest(method="GET", url="googleapi://noop", headers={})

    def parse_poll(self, resp: dict, job: ProviderJob) -> PollResult:
        # Image: completed at submit time.
        if job.native_state.get("modality") == "image":
            return PollResult(status="COMPLETED", raw=resp or {})
        # Video (Veo): refresh the SDK operation handle stored in native_state.
        # CP-2 Phase 8/R4 fix: VideoModelClient short-circuits sentinel poll
        # URLs (googleapi://noop) to resp={}, so the operation must come from
        # native_state, not resp. Without this refresh, Veo would IN_PROGRESS
        # forever until timeout (~30 min).
        op = job.native_state.get("operation")
        if op is None:
            # Veo deprioritized 2026-04-09; surface fail-loud rather than hang.
            return PollResult(
                status="FAILED",
                error=(
                    "Veo polling: no operation handle in native_state. "
                    "Full SDK polling deferred to CP-3."
                ),
                raw=resp or {},
            )
        try:
            client = self._get_client()
            op = client.operations.get(name=getattr(op, "name", ""))
            job.native_state["operation"] = op
        except Exception as e:
            return PollResult(
                status="FAILED",
                error=f"Veo operation refresh failed: {e}",
                raw=resp or {},
            )
        if getattr(op, "done", False):
            err = getattr(op, "error", None)
            if err:
                return PollResult(status="FAILED", error=str(err), raw={"operation": op})
            video_url = None
            response = getattr(op, "response", None)
            if response:
                vids = getattr(response, "generated_videos", []) or []
                if vids:
                    vid = getattr(vids[0], "video", None)
                    if vid:
                        video_url = getattr(vid, "uri", None)
            job.native_state["video_url"] = video_url
            return PollResult(status="COMPLETED", raw={"operation": op})
        return PollResult(status="IN_PROGRESS", raw=resp or {})

    # ---- result fetch ----

    def build_result_fetch(self, job: ProviderJob) -> Optional[PollRequest]:
        return None  # google returns the result inline with poll

    def parse_result(self, resp: dict, job: ProviderJob) -> PollResult:
        if job.native_state.get("modality") == "image":
            # The executor will populate raw with the bytes already; surface them.
            return PollResult(
                status="COMPLETED",
                video_url=None,
                raw={"image_bytes": job.native_state.get("image_bytes")},
            )
        # Video — the executor extracts the video URL from the operation result.
        return PollResult(
            status="COMPLETED",
            video_url=job.native_state.get("video_url"),
            raw=resp,
        )

    # ---- cost ----

    def compute_cost(self, duration_s: float, tier: str, profile: dict) -> float:
        # Image cost lives in profile.image_cost (per-image flat fee).
        # Video cost is per-second.
        modality = (profile or {}).get("modality") or "video"
        if modality == "image":
            return float((profile or {}).get("image_cost", 0.0))
        rate = (profile or {}).get("cost_per_second", 0.0)
        return float(rate) * float(duration_s)

    # ---- direct-submit (used by executor; bypasses HTTP transport) ----

    def direct_submit_image(self, payload: UnifiedVideoPayload) -> dict:
        """Synchronous image generation. Returns {image_bytes, native_id, operation}.

        Used by execute_keyframe / execute_previz in Phase 6 instead of the
        HTTP transport path.
        """
        client = self._get_client()
        hints_dict = coerce_to_dict(payload.hints)
        config = (
            genai_types.GenerateContentConfig(
                **(hints_dict.get("genai_config") or {})
            )
            if hints_dict
            else None
        )
        # Prompt parts come from payload.hints["parts"] — assembler builds this.
        parts = hints_dict.get("parts") or [payload.prompt]
        # Tenacity-style retry: caller wraps if needed; behavior must match
        # GoogleGenaiClient (4 attempts, 4–120s expo).
        # For Phase 2 we keep the retry decoration in the executor; this method
        # is the bare API call.
        response = client.models.generate_content(
            model=payload.model_id,
            contents=parts,
            config=config,
        )
        # Extract the first inline_data (image) part.
        image_bytes = None
        for cand in response.candidates or []:
            for part in cand.content.parts or []:
                inline = getattr(part, "inline_data", None)
                if inline and getattr(inline, "data", None):
                    image_bytes = inline.data
                    break
            if image_bytes:
                break
        return {
            "image_bytes": image_bytes,
            "native_id": getattr(response, "response_id", "unknown"),
            "operation": None,
        }

    def direct_submit_video(self, payload: UnifiedVideoPayload) -> dict:
        """Async Veo 3.1 video. Returns {operation, native_id}."""
        client = self._get_client()
        api_model = _VEO_API_MODELS[payload.model_id]
        kwargs: dict = {
            "model": api_model,
            "prompt": payload.prompt,
            "config": genai_types.GenerateVideosConfig(
                duration_seconds=payload.duration_s,
                aspect_ratio=payload.aspect_ratio or "9:16",
                generate_audio=payload.generate_audio,
            ),
        }
        # Image input (i2v)
        if payload.image is not None:
            # GenAI accepts raw bytes via Image.from_bytes
            img_bytes = (
                payload.image
                if isinstance(payload.image, (bytes, bytearray))
                else base64.b64decode(payload.image)
            )
            kwargs["image"] = genai_types.Image(image_bytes=img_bytes)
        operation = client.models.generate_videos(**kwargs)
        return {
            "operation": operation,
            "native_id": getattr(operation, "name", "veo-job"),
        }


ADAPTER = GoogleAdapter()


# ======================================================================
# Backwards-compat shim — CP-2 Phase 8 (spec-review edit #11, locked
# 2026-04-25). The Production Console's review_server.py and the
# pipeline/api/routes/generation.py routes import GoogleGenaiClient by
# class name and call __init__() + is_available() before delegating
# real work to StepRunner. Until those callsites migrate (CP-3+), keep
# the legacy class name reachable so the Console boots.
#
# DO NOT add real generation logic here — all generation flows through
# GoogleAdapter / VideoModelClient now.
# ======================================================================


class GoogleGenaiClient:
    """Legacy class name for the Google Gemini client.

    Backwards-compat shim — quacks like the pre-CP-2 GoogleGenaiClient
    for the two methods that legacy callers actually use:
      - __init__()                  (no-arg or api_key=...)
      - is_available() -> bool

    All real generation should go through StepRunner / VideoModelClient.
    """

    def __init__(self, api_key: Optional[str] = None):
        self._api_key = api_key or os.environ.get("GEMINI_API_KEY")

    def is_available(self) -> bool:
        """Available if google-genai SDK is installed AND a key is configured."""
        return bool(_HAS_GENAI and (self._api_key or os.environ.get("GEMINI_API_KEY")))


__all__ = ["GoogleAdapter", "ADAPTER", "GoogleGenaiClient"]

__all__ = ["GoogleAdapter", "ADAPTER"]
