# /Users/joeturnerlin/CLAUDE_PROJECTS/recoil/api/tests/test_wire_shape_e2e.py

"""Wire-shape e2e test — Phase 1 gate for console-v2-convergence-A.

This test MUST FAIL on Phase 1 with exactly 3 expected failures:
  1. episode_id_derived_from_filename_prefix fires (driver-beware navigation)
  2. episodes_synthesized_from_shot_index fires (tartarus/afterimage navigation)
  3. scenes_synthesized_one_per_episode fires (tartarus/afterimage navigation)

Phases 2-6 fix the data + remove the 3 deletion-target fallbacks. Phase 4
makes this test PASS. Phase 7 runs the Playwright acceptance set against
the running stack.

Reads real engine state from projects/{tartarus,driver-beware,afterimage}/state/.
No mocking, no fixtures — the SSOT for shot files is the filesystem at
projects_root(). httpx.AsyncClient with ASGITransport hits the FastAPI app
in-process; uvicorn is not required.

The 4-stage navigation: projects -> episodes -> beats -> takes. Every response
parses through its Pydantic model (Project, Episode, Beat, Take) — a parse
failure IS a test failure (catches wire-shape drift).

Fallback detection: BUS.history() snapshot before/after; diff is the events
emitted during this test session. Filter where severity=='fallback'.
"""
from __future__ import annotations

import asyncio
from typing import Set

import pytest
from httpx import ASGITransport, AsyncClient

from recoil.api.eventbus import BUS
from recoil.api.main import app
from recoil.api.schemas.engine import (
    Beat,
    Episode,
    Project,
    Take,
)

# The 3 disguised-default fallbacks that Build A deletes.
DEAD_FALLBACK_NAMES: Set[str] = {
    "episode_id_derived_from_filename_prefix",
    "episodes_synthesized_from_shot_index",
    "scenes_synthesized_one_per_episode",
}

# The 3 projects to exercise. tartarus + afterimage are microdrama; driver-beware
# is client_video (supports_episodes=False — must return episodes=[] post-Build A).
EXPECTED_PROJECTS = {"tartarus", "driver-beware", "afterimage"}


@pytest.fixture(autouse=True)
def _bind_bus_loop():
    """Bind BUS to the running test loop so emit_sync works synchronously.

    Build A Phase 4: defer loop resolution to tolerate pytest-asyncio's
    per-test loop policy — get_event_loop() raises after the previous
    asyncio test's loop is torn down. We fall back to a fresh loop;
    BUS.bind_loop is idempotent for re-binding.
    """
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    BUS.bind_loop(loop)
    yield


def _bus_event_diff(before_id: str | None) -> list:
    """Events appended since the `before_id` watermark."""
    return BUS.history(after_id=before_id)


def _dead_fallback_events(events: list) -> list[str]:
    """Return names of dead-fallback events present in `events`."""
    hits: list[str] = []
    for ev in events:
        if ev.severity != "fallback":
            continue
        if ev.summary in DEAD_FALLBACK_NAMES:
            hits.append(ev.summary)
    return hits


@pytest.mark.asyncio
async def test_wire_shape_e2e_no_dead_fallbacks():
    """End-to-end navigation across 3 projects must not fire any dead fallback."""
    # Snapshot BUS history watermark BEFORE the navigation.
    pre_history = BUS.history()
    watermark = pre_history[-1].id if pre_history else None

    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        # ── Stage 1: list projects ──────────────────────────────────────────
        r = await client.get("/api/projects")
        assert r.status_code == 200, f"GET /api/projects failed: {r.status_code} {r.text}"
        projects = [Project.model_validate(p) for p in r.json()]
        project_ids = {p.id for p in projects}
        assert EXPECTED_PROJECTS.issubset(project_ids), (
            f"Expected projects {EXPECTED_PROJECTS} missing from response: got {project_ids}"
        )

        # ── Stage 2: list episodes for each project ─────────────────────────
        # tartarus + afterimage must return >=1 episode.
        # driver-beware (client_video) must return [] without firing any fallback.
        episodes_per_project: dict[str, list[Episode]] = {}
        for proj_id in EXPECTED_PROJECTS:
            r = await client.get(f"/api/projects/{proj_id}/episodes")
            assert r.status_code == 200, (
                f"GET /api/projects/{proj_id}/episodes failed: {r.status_code} {r.text}"
            )
            episodes = [Episode.model_validate(e) for e in r.json()]
            episodes_per_project[proj_id] = episodes

        assert len(episodes_per_project["tartarus"]) >= 1, "tartarus has no episodes"
        assert len(episodes_per_project["afterimage"]) >= 1, "afterimage has no episodes"
        assert episodes_per_project["driver-beware"] == [], (
            "driver-beware (client_video) must return episodes=[] without fallback"
        )

        # ── Stage 3: navigate one episode of tartarus + afterimage to beats ─
        # Synthetic scene id format from adapters/beats.py:373-384
        for proj_id in ("tartarus", "afterimage"):
            ep_id = episodes_per_project[proj_id][0].id
            scene_id = f"{ep_id}__synthetic_scene_1"
            r = await client.get(
                f"/api/projects/{proj_id}/episodes/{ep_id}/scenes/{scene_id}/beats"
            )
            assert r.status_code == 200, (
                f"GET beats for {proj_id}/{ep_id}/{scene_id} failed: "
                f"{r.status_code} {r.text}"
            )
            beats = [Beat.model_validate(b) for b in r.json()]
            assert len(beats) >= 1, f"{proj_id}/{ep_id} has no beats"

            # ── Stage 4: takes for the first beat ───────────────────────────
            beat_id = beats[0].id
            r = await client.get(
                f"/api/beats/{beat_id}/takes",
                params={"projectId": proj_id},
            )
            assert r.status_code == 200, (
                f"GET takes for {beat_id}@{proj_id} failed: {r.status_code} {r.text}"
            )
            takes = [Take.model_validate(t) for t in r.json()]
            # takes MAY be empty (a beat with no takes is legitimate);
            # the wire-shape assertion is that parsing succeeded.
            assert all(t.beat_id == beat_id for t in takes), "take.beat_id mismatch"

    # ── Final assertion: BUS history diff has no dead-fallback events ───────
    post_events = _bus_event_diff(watermark)
    dead_hits = _dead_fallback_events(post_events)
    assert dead_hits == [], (
        f"Dead fallback(s) fired during e2e navigation: {dead_hits}. "
        f"Build A Phase 4 must delete: {DEAD_FALLBACK_NAMES}. "
        f"Full event diff ({len(post_events)} events): "
        f"{[(e.severity, e.summary) for e in post_events[:20]]}"
    )


@pytest.mark.asyncio
async def test_wire_shape_legitimate_fallbacks_still_allowed():
    """The 18 legitimate fallbacks (cache miss, ffprobe timeout, etc.) MUST still
    fire when appropriate. This test pins that we didn't accidentally delete the
    wrong entries from the registry."""
    from recoil.pipeline._lib.sanctioned_fallbacks import list_sanctioned_fallbacks

    registry = {r.name for r in list_sanctioned_fallbacks()}

    # These 18 must remain after Build A.
    legitimate = {
        "model_alias_resolver",
        "cache_miss_canonical_source",
        "cost_unknown_telemetry_zero",
        "receipts_log_corrupt_line_skip",
        "shot_file_unreadable_drop",
        "take_id_not_on_disk",
        "proposal_id_not_pending",
        "workspace_state_user_chose_discard",
        "workspace_state_user_chose_report",
        "project_load_failure_isolated",
        "take_status_unsignaled_default_queued",
        "take_eval_state_unsignaled_default_pending",
        "step_runner_outer_failure_store_update_skip",
        "step_runner_batch_update_skip",
        "step_runner_nested_cleanup_skip",
        "step_runner_sidecar_best_effort_skip",
        "step_runner_post_step_finalize_skip",
        "recent_id_derivation_failed",
    }
    missing = legitimate - registry
    assert not missing, f"Legitimate fallbacks accidentally deleted: {missing}"

    # These 3 must be ABSENT after Build A Phase 4.
    assert DEAD_FALLBACK_NAMES.isdisjoint(registry), (
        f"Dead fallbacks still in registry: {DEAD_FALLBACK_NAMES & registry}"
    )
