from __future__ import annotations

import ast
import inspect
import textwrap
from pathlib import Path
from types import SimpleNamespace

from recoil.execution.step_runner import StepRunner
from recoil.pipeline._lib.dispatch_payload import PayloadContext, build_unified_payload
from recoil.pipeline.core.runners.video_runner import VideoRunner


def _build_i2v_payload(tmp_path: Path, *, include_end_frame: bool) -> dict:
    start_frame = tmp_path / "start.png"
    start_frame.write_bytes(b"start-frame")
    end_frame = tmp_path / "end.png"
    if include_end_frame:
        end_frame.write_bytes(b"end-frame")

    return build_unified_payload(
        PayloadContext(
            project="fixture",
            modality="video_i2v",
            shot_id="EP001_SH01",
            prompt="camera moves through the corridor",
            model_id="seeddance-2.0",
            start_frame_path=start_frame,
            end_frame_path=end_frame if include_end_frame else None,
            duration_s=5,
            aspect_ratio="9:16",
            generate_audio=False,
        )
    )


class _CapturingStepRunner:
    def __init__(self) -> None:
        self.calls: list[dict] = []

    def execute_video(self, *, image_tail=None, **kwargs):
        call = dict(kwargs)
        call["image_tail"] = image_tail
        self.calls.append(call)
        return SimpleNamespace(
            shot_id=kwargs.get("shot_id"),
            success=True,
            final_state="video_complete",
            output_path="/tmp/fake.mp4",
            cost_usd=0.0,
            error=None,
            take_index=0,
            gate_verdict=None,
            model=kwargs.get("model"),
            pipeline="video",
        )


def _video_runner_execute_video_keyword_names() -> set[str]:
    source = textwrap.dedent(inspect.getsource(VideoRunner.run))
    tree = ast.parse(source)
    keywords: set[str] = set()
    for node in ast.walk(tree):
        if (
            isinstance(node, ast.Call)
            and isinstance(node.func, ast.Attribute)
            and node.func.attr == "execute_video"
        ):
            keywords.update(kw.arg for kw in node.keywords if kw.arg is not None)
    return keywords


def test_start_and_end_frame_payload_forwards_image_tail_to_execute_video(tmp_path):
    payload = _build_i2v_payload(tmp_path, include_end_frame=True)
    step_runner = _CapturingStepRunner()

    result = VideoRunner(step_runner).run(payload)

    assert result.success is True
    call = step_runner.calls[0]
    assert payload["image_tail"]
    assert call["image_tail"] == payload["image_tail"]


def test_builder_frame_keys_match_video_runner_execute_video_contract(tmp_path):
    payload = _build_i2v_payload(tmp_path, include_end_frame=True)
    builder_frame_keys = {
        key for key in ("start_frame", "image_tail") if payload.get(key) is not None
    }

    runner_passthrough_keys = _video_runner_execute_video_keyword_names()

    assert builder_frame_keys <= runner_passthrough_keys
    assert "image_tail" in inspect.signature(StepRunner.execute_video).parameters


def test_start_frame_still_reaches_execute_video_for_start_only_payload(tmp_path):
    payload = _build_i2v_payload(tmp_path, include_end_frame=False)
    step_runner = _CapturingStepRunner()

    result = VideoRunner(step_runner).run(payload)

    assert result.success is True
    call = step_runner.calls[0]
    assert call["start_frame"] == Path(payload["start_frame"])
    assert call["image_tail"] is None


# ── Consumption-block guard (closes the reviewer-#2 gap) ────────────────────
# The key-agreement + boundary tests above prove image_tail is PASSED to
# execute_video. This test proves the REAL execute_video BODY CONSUMES it into
# the submitted payload (step_runner.py consumption block) — so deleting that
# block (a "dead param" refactor that the param-existence probe would miss)
# fails loudly. It also links execute_video's param to the payload the adapter
# reads (via _dict_to_unified), closing the untested middle hop.
class _CaptureSubmit(Exception):
    pass


def test_execute_video_body_consumes_image_tail_into_submit_payload(tmp_path, monkeypatch):
    from unittest.mock import MagicMock
    from recoil.execution import video_model_client as vmc
    from recoil.execution.step_types import ProjectPaths
    import base64

    captured: dict = {}

    def _fake_submit(self, payload):
        captured["payload"] = payload
        raise _CaptureSubmit()  # stop right after the consumption block + submit

    monkeypatch.setattr(vmc.VideoModelClient, "submit", _fake_submit, raising=True)

    paths = ProjectPaths(
        project="fixture", project_root=tmp_path,
        frames_dir=tmp_path / "frames", video_dir=tmp_path / "video",
        plans_dir=tmp_path / "plans", previs_dir=tmp_path / "previs",
    )
    for d in (paths.frames_dir, paths.video_dir, paths.plans_dir, paths.previs_dir):
        d.mkdir()
    store = MagicMock(); store.get_shot.return_value = None
    runner = StepRunner(store=store, paths=paths, validate_frames=False)

    start = tmp_path / "start.png"; start.write_bytes(b"\x89PNG\r\n\x1a\n" + b"0" * 64)
    tail_b64 = base64.b64encode(b"\x89PNG\r\n\x1a\n" + b"END" * 64).decode()

    # Mirror the live f2v dispatch shape: start_frame Path + image_tail base64,
    # end_frame Path absent (image_tail is the sole end-frame carrier).
    try:
        runner.execute_video(
            model="kling-o3-pro", shot_id="EP001_SH01", prompt="x",
            start_frame=start, end_frame=None, image_tail=tail_b64,
            duration=5, aspect_ratio="9:16",
        )
    except _CaptureSubmit:
        pass

    assert "payload" in captured, "execute_video never reached client.submit"
    assert captured["payload"].get("image_tail") == tail_b64, (
        "execute_video body did NOT consume image_tail into the submit payload "
        "(consumption block dropped — the adapter would never see the end frame)"
    )
