"""CP-8 Phase 5 — Workflow integration for audio_t2a + lipsync_post.

Validates that a Workflow containing an audio step and a downstream
lipsync_post step (depends_on=audio) executes top-down with both succeeded,
and that the canonical pre_step hook resolves audio_path from the upstream
step's receipt.

ALL HTTP transport mocked via the `_transport=` payload key.
"""

import json
import sys
import pathlib
from types import SimpleNamespace

sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[3]))
from recoil.core.paths import ensure_pipeline_importable  # noqa: E402
ensure_pipeline_importable()


from recoil.pipeline.core.dispatch_context import DispatchContext  # noqa: E402
from recoil.pipeline.core.workflow import Workflow, WorkflowStep  # noqa: E402


# ── Stub StepRunner ─────────────────────────────────────────────────────


def _step_result(**overrides):
    """Mirrors the StepRunner result shape used by ImageRunner/VideoRunner tests."""
    base = dict(
        shot_id="X", success=True, final_state="keyframe_generated",
        output_path="/tmp/x.png", cost_usd=0.04, error=None,
        take_index=0, gate_verdict=None, model="nbp", pipeline="still",
    )
    base.update(overrides)
    return SimpleNamespace(**base)


class _StubStepRunner:
    """Same shape as test_workflow_dispatch_integration._StubStepRunner."""

    def __init__(self):
        self.calls: list[tuple[str, dict]] = []
        self._dispatch_path = "unknown"

    def execute_keyframe(self, **kw):
        self.calls.append(("keyframe", dict(kw)))
        return _step_result()

    def execute_video(self, **kw):
        self.calls.append(("video", dict(kw)))
        return _step_result(
            output_path="/tmp/v.mp4", final_state="video_complete",
            cost_usd=0.20, model="seeddance-2.0", pipeline="i2v",
        )


# ── Fake HTTP transports (audio + lipsync) ──────────────────────────────


class _FakeResponse:
    def __init__(self, body: bytes, status: int = 200, headers=None):
        self._body = body
        self.status = status
        self.headers = headers or {"request-id": "req_test_123"}

    def read(self) -> bytes:
        return self._body

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc, tb):
        return False


def _make_audio_transport(audio_bytes: bytes = b"FAKE_MP3_BYTES"):
    def _transport(url, *, headers, body, timeout):
        return _FakeResponse(audio_bytes)

    return _transport


def _make_audio_failing_transport():
    """Audio transport that raises immediately so audio dispatch fails."""
    from recoil.execution.providers import elevenlabs as _eleven

    def _transport(url, *, headers, body, timeout):
        raise _eleven.AuthError("401 invalid key")

    return _transport


def _make_lipsync_5step_transport(
    *,
    job_id: str = "job_xyz",
    output_bytes: bytes = b"FAKE_LIPSYNCED_MP4",
    duration_s: float = 4.2,
):
    state = {"poll_count": 0}

    def _transport(url, *, headers, body, method="GET", timeout=60.0):
        if "/v2/upload" in url:
            return _FakeResponse(
                json.dumps({"url": "https://cdn.sync.so/file_xyz"}).encode("utf-8")
            )
        if url.endswith("/v2/generate") and method == "POST":
            return _FakeResponse(json.dumps({"id": job_id}).encode("utf-8"))
        if "/v2/generate/" in url and method == "GET":
            state["poll_count"] += 1
            if state["poll_count"] <= 1:
                return _FakeResponse(
                    json.dumps({"status": "PROCESSING"}).encode("utf-8")
                )
            return _FakeResponse(
                json.dumps({
                    "status": "COMPLETED",
                    "outputUrl": "https://cdn.sync.so/output_v.mp4",
                    "duration_s": duration_s,
                }).encode("utf-8")
            )
        if "output_v.mp4" in url:
            return _FakeResponse(output_bytes)
        raise AssertionError(f"unexpected URL in fake transport: {url}")

    return _transport


# ── Fixtures ────────────────────────────────────────────────────────────


def _make_video_input(tmp_path):
    """Lipsync needs a real video file on disk (audio comes from the audio step)."""
    v = tmp_path / "carrier.mp4"
    v.write_bytes(b"FAKE CARRIER VIDEO BYTES")
    return v


def _audio_payload(tmp_path, **overrides):
    base = {
        "shot_id": "EP001_SH02",
        "text": "Hello world",
        "voice_id": "voice_xyz",
        "model": "eleven_multilingual_v2",
        "output_dir": str(tmp_path / "audio_out"),
        "_transport": _make_audio_transport(),
    }
    base.update(overrides)
    return base


def _lipsync_payload(tmp_path, **overrides):
    """Lipsync payload WITHOUT audio_path — pre_step hook resolves it from
    the upstream `tts` step's receipt. Tests that supply audio_path explicitly
    pass it via overrides."""
    v = _make_video_input(tmp_path)
    base = {
        "shot_id": "EP001_SH02",
        "video_path": str(v),
        "model": "lipsync-2.0",
        "output_dir": str(tmp_path / "lipsync_out"),
        "_transport": _make_lipsync_5step_transport(),
        "poll_interval_s": 0.0,
    }
    base.update(overrides)
    return base


# ── Pre_step hook (LOCKED-3 from spec lines 2121-2148) ──────────────────


def _audio_to_lipsync_hook(step, workflow):
    """Resolves audio_path from the upstream `tts` step's receipt, writing it
    into the lipsync_post step's payload before dispatch.

    Test-suite-only — see BUILD_SPEC_CP8.md lines 2121-2148.
    """
    if step.modality != "lipsync_post":
        return
    if "audio_path" in step.payload and step.payload["audio_path"]:
        return
    tts = workflow.get_step("tts")
    if tts is None or tts.receipt is None:
        return
    if not tts.receipt.run_result.success:
        return
    audio_path = tts.receipt.run_result.output_path
    if audio_path:
        step.payload["audio_path"] = audio_path


# ── Tests ───────────────────────────────────────────────────────────────


def test_two_step_workflow_audio_then_lipsync_succeeds(tmp_path):
    """audio_t2a → lipsync_post (depends_on=tts). Hook resolves audio_path.
    Both steps succeed; statuses are 'succeeded'."""
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        project="tartarus",
        episode=1,
        receipts_log_path=str(tmp_path / "receipts.jsonl"),
    )
    wf = Workflow(workflow_id="wf_audio_lipsync", steps=[
        WorkflowStep(step_id="tts", modality="audio_t2a",
                     payload=_audio_payload(tmp_path)),
        WorkflowStep(step_id="lipsync", modality="lipsync_post",
                     payload=_lipsync_payload(tmp_path),
                     depends_on=["tts"]),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    assert wf.steps[0].status == "succeeded"
    assert wf.steps[1].status == "succeeded"
    assert wf.steps[0].receipt.modality == "audio_t2a"
    assert wf.steps[1].receipt.modality == "lipsync_post"
    assert wf.steps[1].receipt.run_result.success is True
    assert pathlib.Path(wf.steps[1].receipt.run_result.output_path).is_file()


def test_audio_failure_skips_lipsync(tmp_path):
    """Audio step fails (transport raises AuthError) → lipsync is skipped
    (status='skipped'), step.error mentions upstream failure."""
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        receipts_log_path="DISABLED",
    )
    wf = Workflow(workflow_id="wf_audio_fail", steps=[
        WorkflowStep(step_id="tts", modality="audio_t2a",
                     payload=_audio_payload(
                         tmp_path,
                         _transport=_make_audio_failing_transport(),
                     )),
        WorkflowStep(step_id="lipsync", modality="lipsync_post",
                     payload=_lipsync_payload(tmp_path),
                     depends_on=["tts"]),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    assert wf.steps[0].status == "failed"
    assert wf.steps[1].status == "skipped"
    assert "upstream dependency failed" in (wf.steps[1].error or "")


def test_pre_step_hook_resolves_audio_path_from_upstream_receipt(tmp_path):
    """Lipsync payload starts with NO audio_path. After workflow runs with
    the hook, the lipsync step's payload contains audio_path resolved from
    tts step's receipt.run_result.output_path."""
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        receipts_log_path="DISABLED",
    )
    lipsync_payload = _lipsync_payload(tmp_path)
    assert "audio_path" not in lipsync_payload  # pre-condition

    wf = Workflow(workflow_id="wf_hook", steps=[
        WorkflowStep(step_id="tts", modality="audio_t2a",
                     payload=_audio_payload(tmp_path)),
        WorkflowStep(step_id="lipsync", modality="lipsync_post",
                     payload=lipsync_payload,
                     depends_on=["tts"]),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    tts_step = wf.get_step("tts")
    lipsync_step = wf.get_step("lipsync")
    assert tts_step.receipt is not None
    resolved = lipsync_step.payload["audio_path"]
    assert resolved == tts_step.receipt.run_result.output_path
    assert pathlib.Path(resolved).is_file()
    assert lipsync_step.status == "succeeded"


def test_executed_workflow_round_trip_preserves_receipts(tmp_path):
    """Workflow.from_dict(wf.to_dict()) round-trips the workflow with both
    receipts intact (CP-6 contract: receipts ARE serialized)."""
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        project="tartarus",
        episode=1,
        receipts_log_path=str(tmp_path / "receipts.jsonl"),
    )
    wf = Workflow(workflow_id="wf_rt", steps=[
        WorkflowStep(step_id="tts", modality="audio_t2a",
                     payload=_audio_payload(tmp_path)),
        WorkflowStep(step_id="lipsync", modality="lipsync_post",
                     payload=_lipsync_payload(tmp_path),
                     depends_on=["tts"]),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    wf2 = Workflow.from_dict(wf.to_dict())
    assert wf2.workflow_id == "wf_rt"
    assert wf2.steps[0].step_id == "tts"
    assert wf2.steps[1].step_id == "lipsync"
    assert wf2.steps[0].status == "succeeded"
    assert wf2.steps[1].status == "succeeded"
    assert wf2.steps[0].receipt is not None
    assert wf2.steps[1].receipt is not None
    assert wf2.steps[0].receipt.modality == "audio_t2a"
    assert wf2.steps[1].receipt.modality == "lipsync_post"
    assert wf2.steps[1].receipt.provenance["workflow_step_id"] == "lipsync"


def test_workflow_id_and_step_id_stamped_on_receipt_provenance(tmp_path):
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        receipts_log_path="DISABLED",
    )
    wf = Workflow(workflow_id="canonical_wf_id", steps=[
        WorkflowStep(step_id="tts", modality="audio_t2a",
                     payload=_audio_payload(tmp_path)),
        WorkflowStep(step_id="lipsync", modality="lipsync_post",
                     payload=_lipsync_payload(tmp_path),
                     depends_on=["tts"]),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    tts_prov = wf.steps[0].receipt.provenance
    ls_prov = wf.steps[1].receipt.provenance
    assert tts_prov["workflow_id"] == "canonical_wf_id"
    assert tts_prov["workflow_step_id"] == "tts"
    assert ls_prov["workflow_id"] == "canonical_wf_id"
    assert ls_prov["workflow_step_id"] == "lipsync"


def test_idempotent_re_run_resets_state_and_succeeds_again(tmp_path):
    """Calling wf.run twice clears stale execution state on every step
    (per CP-6 documented contract — see test_rerun_clears_stale_execution_state).
    Both runs end with status='succeeded' and fresh receipts."""
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        receipts_log_path="DISABLED",
    )
    wf = Workflow(workflow_id="wf_rerun", steps=[
        WorkflowStep(step_id="tts", modality="audio_t2a",
                     payload=_audio_payload(tmp_path)),
        WorkflowStep(step_id="lipsync", modality="lipsync_post",
                     payload=_lipsync_payload(tmp_path),
                     depends_on=["tts"]),
    ])

    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)
    first_tts_receipt = wf.steps[0].receipt
    first_ls_receipt = wf.steps[1].receipt
    assert first_tts_receipt is not None
    assert first_ls_receipt is not None
    assert wf.steps[0].status == "succeeded"
    assert wf.steps[1].status == "succeeded"

    # Second run: must reset stale state and produce fresh receipts.
    # Reset payloads (the hook mutated lipsync.payload during run 1; the runner
    # will re-resolve via the hook on run 2 too).
    wf.steps[1].payload = _lipsync_payload(tmp_path)
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)
    assert wf.steps[0].status == "succeeded"
    assert wf.steps[1].status == "succeeded"
    # Fresh receipts (different receipt_ids — make_receipt_id uses microsecond ts).
    assert wf.steps[0].receipt is not first_tts_receipt
    assert wf.steps[1].receipt is not first_ls_receipt


def test_mixed_modality_3_step_workflow_all_succeed(tmp_path):
    """image_t2i + audio_t2a + lipsync_post — all three succeed.
    Image uses StepRunner stub; audio + lipsync use mocked transports.
    """
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        receipts_log_path="DISABLED",
    )
    wf = Workflow(workflow_id="wf_mixed", steps=[
        WorkflowStep(
            step_id="kf", modality="image_t2i",
            payload={"shot_id": "X", "prompt": "p", "model": "nbp", "aspect_ratio": "9_16"},
        ),
        WorkflowStep(
            step_id="tts", modality="audio_t2a",
            payload=_audio_payload(tmp_path),
        ),
        WorkflowStep(
            step_id="lipsync", modality="lipsync_post",
            payload=_lipsync_payload(tmp_path),
            depends_on=["tts"],
        ),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    assert all(s.status == "succeeded" for s in wf.steps)
    assert wf.steps[0].receipt.modality == "image_t2i"
    assert wf.steps[1].receipt.modality == "audio_t2a"
    assert wf.steps[2].receipt.modality == "lipsync_post"


def test_cost_summable_across_receipts_in_3_step_workflow(tmp_path):
    """sum(receipt.run_result.metadata['cost_usd']) over all 3 steps is a
    valid floating-point total — the per-receipt cost_usd is always present."""
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        receipts_log_path="DISABLED",
    )
    wf = Workflow(workflow_id="wf_cost", steps=[
        WorkflowStep(
            step_id="kf", modality="image_t2i",
            payload={"shot_id": "X", "prompt": "p", "model": "nbp", "aspect_ratio": "9_16"},
        ),
        WorkflowStep(
            step_id="tts", modality="audio_t2a",
            payload=_audio_payload(tmp_path),
        ),
        WorkflowStep(
            step_id="lipsync", modality="lipsync_post",
            payload=_lipsync_payload(tmp_path),
            depends_on=["tts"],
        ),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    total = sum(
        float(s.receipt.run_result.metadata.get("cost_usd") or 0.0)
        for s in wf.steps
    )
    assert isinstance(total, float)
    # The image stub returns cost_usd=0.04; audio + lipsync return >= 0.
    assert total >= 0.04


def test_caller_supplied_audio_path_overrides_hook(tmp_path):
    """If lipsync.payload already has an explicit audio_path, the hook is a
    no-op (caller wins). Verified by supplying an existing audio file and
    checking the dispatched lipsync step used it (not the tts output)."""
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        receipts_log_path="DISABLED",
    )
    # Pre-existing audio file the caller wants to use:
    explicit_audio = tmp_path / "caller_supplied.mp3"
    explicit_audio.write_bytes(b"FAKE_CALLER_SUPPLIED_AUDIO")

    lipsync_payload = _lipsync_payload(tmp_path, audio_path=str(explicit_audio))
    wf = Workflow(workflow_id="wf_explicit", steps=[
        WorkflowStep(step_id="tts", modality="audio_t2a",
                     payload=_audio_payload(tmp_path)),
        WorkflowStep(step_id="lipsync", modality="lipsync_post",
                     payload=lipsync_payload,
                     depends_on=["tts"]),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    # Hook must NOT have overwritten the explicit caller-supplied audio_path.
    assert wf.get_step("lipsync").payload["audio_path"] == str(explicit_audio)
    assert wf.steps[1].status == "succeeded"


def test_hook_is_noop_for_non_lipsync_steps(tmp_path):
    """Calling the hook with an audio_t2a step does NOT mutate it or error.
    The hook's first guard (`step.modality != 'lipsync_post'`) returns early.
    """
    sr = _StubStepRunner()
    ctx = DispatchContext(
        caller_id="workflow_test",
        step_runner=sr,
        receipts_log_path="DISABLED",
    )
    audio_payload_pre = dict(_audio_payload(tmp_path))
    wf = Workflow(workflow_id="wf_noop", steps=[
        WorkflowStep(step_id="tts", modality="audio_t2a",
                     payload=dict(audio_payload_pre)),
        WorkflowStep(step_id="lipsync", modality="lipsync_post",
                     payload=_lipsync_payload(tmp_path),
                     depends_on=["tts"]),
    ])
    wf.run(context=ctx, pre_step=_audio_to_lipsync_hook)

    # tts step's payload was not augmented with an audio_path key by the hook.
    tts_payload = wf.get_step("tts").payload
    assert "audio_path" not in tts_payload
    assert wf.steps[0].status == "succeeded"


def test_hook_defensive_no_op_when_upstream_tts_receipt_is_none(tmp_path):
    """The hook's `tts.receipt is None` guard exists for defensive safety —
    e.g. a caller invoking the hook out of band. Calling it with a fresh
    workflow (before run) does not error and does not mutate the lipsync
    step's payload.
    """
    fresh_audio = WorkflowStep(
        step_id="tts", modality="audio_t2a",
        payload=_audio_payload(tmp_path),
    )
    lipsync_payload_pre = _lipsync_payload(tmp_path)
    fresh_lipsync = WorkflowStep(
        step_id="lipsync", modality="lipsync_post",
        payload=lipsync_payload_pre,
        depends_on=["tts"],
    )
    wf = Workflow(workflow_id="wf_defensive", steps=[fresh_audio, fresh_lipsync])
    # Pre-run: tts.receipt is None. Hook must no-op.
    assert fresh_audio.receipt is None
    _audio_to_lipsync_hook(fresh_lipsync, wf)
    assert "audio_path" not in fresh_lipsync.payload
